欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

DatagramChannelImpl 解析四(地址绑定,关闭通道等)

程序员文章站 2022-07-13 16:59:48
...
DatagramChannelImpl 解析一(初始化):http://donald-draper.iteye.com/blog/2373245
DatagramChannelImpl 解析二(报文发送与接收):http://donald-draper.iteye.com/blog/2373281
DatagramChannelImpl 解析三(多播):http://donald-draper.iteye.com/blog/2373507
引言:
前面一篇文章我们看了报文通道加入多播组,阻塞和解除阻塞源地址报文等方法,先来回顾一下,
    join(报文通道加入多播组)方法,首先检查加入的多播组地址是否正确,然后校验源地址,检查多播成员关系注册器中是否存在多播地址为inetaddress,网络接口为networkinterface,源地址为inetaddress1的多播成员关系key,有则直接返回,否则根据网络协议族family,网络接口,源地址构造多播成员关系MembershipKeyImpl,添加到注册器MembershipRegistry。
    阻塞源地址报文与解除源地址报文阻塞,首先检查源地址,再将实际的阻塞与解除阻塞工作委托给Net完成。
    drop方法,首先判断多播成员关系key是否有效,如果有效,判断多播组为ip4还是ip6,然后委托给Net完成实际的drop工作。
今天来看报文通道的其他方法,
先来看地址绑定
public volatile NetworkChannel bind(SocketAddress socketaddress)
        throws IOException
    {
        return bind(socketaddress);
    }
 public DatagramChannel bind(SocketAddress socketaddress)
        throws IOException
    {
        synchronized(readLock)
        {
            synchronized(writeLock)
            {
                synchronized(stateLock)
                {
		    //同步读写锁,及状态锁,确保通道打开
                    ensureOpen();
		    //如果本地地址不为null,则已绑定
                    if(localAddress != null)
                        throw new AlreadyBoundException();
                    InetSocketAddress inetsocketaddress;
                    if(socketaddress == null)
                    {
		        //如果绑定的socket地址为null,则创建统配地址为绑定地址
                        if(family == StandardProtocolFamily.INET)
                            inetsocketaddress = new InetSocketAddress(InetAddress.getByName("0.0.0.0"), 0);
                        else
                            inetsocketaddress = new InetSocketAddress(0);
                    } else
                    {
		        //否则检查socket地址
                        inetsocketaddress = Net.checkAddress(socketaddress);
                        if(family == StandardProtocolFamily.INET)
                        {
                            InetAddress inetaddress = inetsocketaddress.getAddress();
                            if(!(inetaddress instanceof Inet4Address))
                                throw new UnsupportedAddressTypeException();
                        }
                    }
                    SecurityManager securitymanager = System.getSecurityManager();
                    if(securitymanager != null)
		        //检查socket端口监听权限
                        securitymanager.checkListen(inetsocketaddress.getPort());
		   //委托给Net
                    Net.bind(family, fd, inetsocketaddress.getAddress(), inetsocketaddress.getPort());
		    //初始化本地地址
                    localAddress = Net.localAddress(fd);
                }
            }
        }
        return this;
    }

再来看连接操作
public DatagramChannel connect(SocketAddress socketaddress)
        throws IOException
    {
        boolean flag = false;
        synchronized(readLock)
        {
            synchronized(writeLock)
            {
                synchronized(stateLock)
                {
		    //同步读写及状态锁,确保通道打开,未建立连接
                    ensureOpenAndUnconnected();
                    InetSocketAddress inetsocketaddress = Net.checkAddress(socketaddress);
                    SecurityManager securitymanager = System.getSecurityManager();
                    if(securitymanager != null)
		        //检查连接socket地址权限
                        securitymanager.checkConnect(inetsocketaddress.getAddress().getHostAddress(), inetsocketaddress.getPort());
                    int i = Net.connect(family, fd, inetsocketaddress.getAddress(), inetsocketaddress.getPort());
                    if(i <= 0)
                        throw new Error();
                    state = 1;//已来你就饿
		    //初始化远端地址
                    remoteAddress = socketaddress;
                    sender = inetsocketaddress;//初始化发送者地址
		    //缓存发送者地址及端口
                    cachedSenderInetAddress = inetsocketaddress.getAddress();
                    cachedSenderPort = inetsocketaddress.getPort();
		    //根据文件描述符获取本地地址
                    localAddress = Net.localAddress(fd);
                }
            }
        }
        return this;
    }

再来看断开连接方法
public DatagramChannel disconnect()
        throws IOException
    {
        Object obj = readLock;
        JVM INSTR monitorenter ;
        Object obj1 = writeLock;
        JVM INSTR monitorenter ;
        Object obj2 = stateLock;
        JVM INSTR monitorenter ;
	//同步读写及状态锁
	//确保处于连接状态或通道打开
        if(!isConnected() || !isOpen())
            return this;
	//获取远端地址
        InetSocketAddress inetsocketaddress = (InetSocketAddress)remoteAddress;
        SecurityManager securitymanager = System.getSecurityManager();
        if(securitymanager != null)
	    //检查连接远端地址权限
            securitymanager.checkConnect(inetsocketaddress.getAddress().getHostAddress(), inetsocketaddress.getPort());
       //完成实际断开连接操作
	disconnect0(fd);
        remoteAddress = null;
        state = 0;//未连接
        localAddress = Net.localAddress(fd);
        ...
        return this;
    }
private static native void disconnect0(FileDescriptor filedescriptor)
        throws IOException;

再来看配置通道阻塞模式
protected void implConfigureBlocking(boolean flag)
        throws IOException
    {
        //委托给IOUtil
        IOUtil.configureBlocking(fd, flag);
    }

再来看关闭通道方法
 protected void implCloseSelectableChannel()
        throws IOException
    {
        synchronized(stateLock)
        {
            if(state != 2)
	        //如果通道处于非关闭状态,则委托给报文分发器预先关闭文件描述
                nd.preClose(fd);
	   //更新报文socket计数器,自减1
            ResourceManager.afterUdpClose();
            if(registry != null)
	        //注册器不为null,则使注册器中的所有多播组无效
                registry.invalidateAll();
            long l;
	    //通知本地读写线程
            if((l = readerThread) != 0L)
                NativeThread.signal(l);
            if((l = writerThread) != 0L)
                NativeThread.signal(l);
            if(!isRegistered())
	        //如果通道当前没有注册到任何选择器,则kill,完整实际的关闭工作
                kill();
        }
    }

上面有两点要关注
1.
 if(state != 2)
     //如果通道处于非关闭状态,则委托给报文分发器预先关闭文件描述
     nd.preClose(fd);

//NativeDispatcher
    void preClose(FileDescriptor filedescriptor)
        throws IOException
    {
    }

2.
if(!isRegistered())
        //如果通道当前没有注册到任何选择器,则kill,完整实际的关闭工作
        kill();

public void kill()
        throws IOException
    {
label0:
        {
            synchronized(stateLock)
            {
                if(state != 2)
		    //如果状态为非关闭,则跳到label0
                    break label0;
            }
            return;
        }
        if(state != -1)
            break MISSING_BLOCK_LABEL_34;
        state = 2;
        obj;
        JVM INSTR monitorexit ;
        return;
        if(!$assertionsDisabled && (isOpen() || isRegistered()))
            throw new AssertionError();
	//关闭文件描述
        nd.close(fd);
        state = 2;
        obj;
        JVM INSTR monitorexit ;
          goto _L1
        exception;
        throw exception;
_L1:
    }

//DatagramDispatcher
 void close(FileDescriptor filedescriptor)
        throws IOException
    {
        SocketDispatcher.close0(filedescriptor);
    }

从上面可以看出,关闭通道实际完成的工作为更新系统报文socket计数器,即自减1;
注册器不为null,则使注册器中的所有多播组无效;通知本地读写线程,通道已关闭;
委托报文分发器DatagramDispatcher关闭文件描述。
再来看其他方法,
//获取本地地址
 public SocketAddress getLocalAddress()
        throws IOException
{
    Object obj = stateLock;
    JVM INSTR monitorenter ;
    if(!isOpen())
        throw new ClosedChannelException();
    return localAddress;
    Exception exception;
    exception;
    throw exception;
}
//获取远端地址
public SocketAddress getRemoteAddress()
    throws IOException
{
    Object obj = stateLock;
    JVM INSTR monitorenter ;
    if(!isOpen())
        throw new ClosedChannelException();
    return remoteAddress;
    Exception exception;
    exception;
    throw exception;
}

//获取通道报文socket
public DatagramSocket socket()
{
    Object obj = stateLock;
    JVM INSTR monitorenter ;
    if(socket == null)
        //委托给DatagramSocketAdaptor,根据通道创建报文socket
        socket = DatagramSocketAdaptor.create(this);
    return socket;
    Exception exception;
    exception;
    throw exception;
}

//DatagramSocketAdaptor,可以简单理解为报文通道的静态代理。
public class DatagramSocketAdaptor extends DatagramSocket
{
    private final DatagramChannelImpl dc;//报文通道
    private volatile int timeout;
    private static final DatagramSocketImpl dummyDatagramSocket = new DatagramSocketImpl() {
        protected void create()
            throws SocketException
        {
        }
        protected void bind(int i, InetAddress inetaddress)
            throws SocketException
        {
        }

        protected void send(DatagramPacket datagrampacket)
            throws IOException
        {
        }
        protected int peek(InetAddress inetaddress)
            throws IOException
        {
            return 0;
        }
        protected int peekData(DatagramPacket datagrampacket)
            throws IOException
        {
            return 0;
        }
        protected void receive(DatagramPacket datagrampacket)
            throws IOException
        {
        }
        protected void setTTL(byte byte0)
            throws IOException
        {
        }
        protected byte getTTL()
            throws IOException
        {
            return 0;
        }
        protected void setTimeToLive(int i)
            throws IOException
        {
        }
        protected int getTimeToLive()
            throws IOException
        {
            return 0;
        }
        protected void join(InetAddress inetaddress)
            throws IOException
        {
        }
        protected void leave(InetAddress inetaddress)
            throws IOException
        {
        }
        protected void joinGroup(SocketAddress socketaddress, NetworkInterface networkinterface)
            throws IOException
        {
        }
        protected void leaveGroup(SocketAddress socketaddress, NetworkInterface networkinterface)
            throws IOException
        {
        }
        protected void close()
        {
        }
        public Object getOption(int i)
            throws SocketException
        {
            return null;
        }
        public void setOption(int i, Object obj)
            throws SocketException
        {
        }

    };

   }
   //构造报文socket适配器
    private DatagramSocketAdaptor(DatagramChannelImpl datagramchannelimpl)
        throws IOException
    {
        super(dummyDatagramSocket);
        timeout = 0;
        dc = datagramchannelimpl;
    }
    public void bind(SocketAddress socketaddress)
        throws SocketException
    {
        try
        {
            if(socketaddress == null)
                socketaddress = new InetSocketAddress(0);
            //委托为报文通道
            dc.bind(socketaddress);
        }
        catch(Exception exception)
        {
            Net.translateToSocketException(exception);
        }
    }
    public void close()
    {
        try
        {
	    //委托为报文通道
            dc.close();
        }
        catch(IOException ioexception)
        {
            throw new Error(ioexception);
        }
    }
    ...
}


从上面可以看出,获取通道报文socket,实际上返回的报文通道适配器DatagramSocketAdaptor,可以简单理解为报文通道的静态代理。
//确保通道打开
private void ensureOpen()
        throws ClosedChannelException
    {
        if(!isOpen())
            throw new ClosedChannelException();
        else
            return;
    }
//通道是否连接
public boolean isConnected()
    {
        Object obj = stateLock;
        JVM INSTR monitorenter ;
        return state == 1;
        Exception exception;
        exception;
        throw exception;
    }
//确保通道打开,且未连接
    void ensureOpenAndUnconnected()
        throws IOException
    {
        synchronized(stateLock)
        {
            if(!isOpen())
                throw new ClosedChannelException();
            if(state != 0)
                throw new IllegalStateException("Connect already invoked");
        }
   }
 //finalize
 protected void finalize()
        throws IOException
    {
        if(fd != null)
            close();
    }
//设置就绪事件
public boolean translateAndSetReadyOps(int i, SelectionKeyImpl selectionkeyimpl)
{
    return translateReadyOps(i, 0, selectionkeyimpl);
}
//更新就绪事件
 public boolean translateAndUpdateReadyOps(int i, SelectionKeyImpl selectionkeyimpl)
{
    return translateReadyOps(i, selectionkeyimpl.nioReadyOps(), selectionkeyimpl);
}
public boolean translateReadyOps(int i, int j, SelectionKeyImpl selectionkeyimpl)
{
    int k = selectionkeyimpl.nioInterestOps();
    int l = selectionkeyimpl.nioReadyOps();
    int i1 = j;
    //就绪事件为读1写4连接8,接受连接事件16,不是这四种事件,则返回false
    if((i & 32) != 0)
        return false;
    //下面的这段24,16不是很明白,理解的网友可以给我留言,一起探讨,
    //莫非为8+16,接受连接,并建立连接
    if((i & 24) != 0)
    {
        i1 = k;
        selectionkeyimpl.nioReadyOps(i1);
        return (i1 & ~l) != 0;
    }
    if((i & 1) != 0 && (k & 1) != 0)
        i1 |= 1;//读事件,已连接
    if((i & 4) != 0 && (k & 4) != 0)
        i1 |= 4;//写事件
    selectionkeyimpl.nioReadyOps(i1);
    return (i1 & ~l) != 0;
}
 //设置通道兴趣事件
 public void translateAndSetInterestOps(int i, SelectionKeyImpl selectionkeyimpl)
 {
     int j = 0;
     if((i & 1) != 0)
         j |= 1;//读事件
     if((i & 4) != 0)
         j |= 4;//写事件
     if((i & 8) != 0)
         j |= 2;//连接事件
     selectionkeyimpl.selector.putEventOps(selectionkeyimpl, j);
 }
//通道支持配置项
public final Set supportedOptions()
    {
        return DefaultOptionsHolder.defaultOptions;
    }
 private static class DefaultOptionsHolder
    { 
        private DefaultOptionsHolder()
        {
        }
	static final Set defaultOptions = defaultOptions();
        private static Set defaultOptions()
        {
            HashSet hashset = new HashSet(8);
            hashset.add(StandardSocketOptions.SO_SNDBUF);//发送缓冲区
            hashset.add(StandardSocketOptions.SO_RCVBUF);//接受缓存区
            hashset.add(StandardSocketOptions.SO_REUSEADDR);//地址重用
            hashset.add(StandardSocketOptions.SO_BROADCAST);//是否支持报文广播传输
            hashset.add(StandardSocketOptions.IP_TOS);//网络协议服务类型
            hashset.add(StandardSocketOptions.IP_MULTICAST_IF);//多播网络接口
            hashset.add(StandardSocketOptions.IP_MULTICAST_TTL);//多播报文存活时间
            hashset.add(StandardSocketOptions.IP_MULTICAST_LOOP);//是否支持多播环路地址
            return Collections.unmodifiableSet(hashset);
        }
       
    }
//配置选项
 public DatagramChannel setOption(SocketOption socketoption, Object obj)
        throws IOException
    {
        if(socketoption == null)
            throw new NullPointerException();
	//如果配置为通道非支持配置选项
        if(!supportedOptions().contains(socketoption))
            throw new UnsupportedOperationException((new StringBuilder()).append("'").append(socketoption).append("' not supported").toString());
        Object obj1 = stateLock;
        JVM INSTR monitorenter ;
        ensureOpen();
        if(socketoption != StandardSocketOptions.IP_TOS)
            break MISSING_BLOCK_LABEL_102;
        if(family == StandardProtocolFamily.INET)
	    //委托给Net
            Net.setSocketOption(fd, family, socketoption, obj);
        return this;
	//配置选项非多播报文存活时间,是否支持多播环路地址,调到L2,否则L1
        if(socketoption != StandardSocketOptions.IP_MULTICAST_TTL && socketoption != StandardSocketOptions.IP_MULTICAST_LOOP) 
	       goto _L2; else goto _L1
_L1:
        Net.setSocketOption(fd, family, socketoption, obj);
        this;
        obj1;
        JVM INSTR monitorexit ;
        return;
_L2:
        if(socketoption != StandardSocketOptions.IP_MULTICAST_IF) goto _L4; else goto _L3
_L3:   //配置选项为多播网络接口
        if(obj == null)
            throw new IllegalArgumentException("Cannot set IP_MULTICAST_IF to 'null'");
        NetworkInterface networkinterface = (NetworkInterface)obj;
        if(family == StandardProtocolFamily.INET6)
        {
            int i = networkinterface.getIndex();
            if(i == -1)
                throw new IOException("Network interface cannot be identified");
            //配置文件描述网络接口
            Net.setInterface6(fd, i);
        } else
        {
            Inet4Address inet4address = Net.anyInet4Address(networkinterface);
            if(inet4address == null)
                throw new IOException("Network interface not configured for IPv4");
            int j = Net.inet4AsInt(inet4address);
	    //配置文件描述网络接口
            Net.setInterface4(fd, j);
        }
        this;
        obj1;
        JVM INSTR monitorexit ;
        return;
_L4:
        Net.setSocketOption(fd, Net.UNSPEC, socketoption, obj);
      ...
    }
//获取配置项
 public Object getOption(SocketOption socketoption)
        throws IOException
    {
        if(socketoption == null)
            throw new NullPointerException();
	//如果配置为通道非支持配置选项
        if(!supportedOptions().contains(socketoption))
            throw new UnsupportedOperationException((new StringBuilder()).append("'").append(socketoption).append("' not supported").toString());
        Object obj = stateLock;
        JVM INSTR monitorenter ;
        ensureOpen();
	//网络服务类型配置选项,是调到L2,否调到L1
        if(socketoption != StandardSocketOptions.IP_TOS) goto _L2; else goto _L1
_L1:
        if(family == StandardProtocolFamily.INET)
	    //委托给Net,获取选项配置
            return Net.getSocketOption(fd, family, socketoption);
        Integer.valueOf(0);
        obj;
        JVM INSTR monitorexit ;
        return;
_L2:   //配置选项非多播报文存活时间,是否支持多播环路地址,调到L4,否则L3
        if(socketoption != StandardSocketOptions.IP_MULTICAST_TTL && socketoption != StandardSocketOptions.IP_MULTICAST_LOOP)
	    goto _L4; else goto _L3
_L3:    //委托给Net,获取选项配置
        Net.getSocketOption(fd, family, socketoption);
        obj;
        JVM INSTR monitorexit ;
        return;
_L4:    //配置选项非网络接口,调到L6,否则L5
        if(socketoption != StandardSocketOptions.IP_MULTICAST_IF) goto _L6; else goto _L5
	//下面的就不看了,与setOptions中的思路是逆向的
_L5:
        if(family != StandardProtocolFamily.INET) goto _L8; else goto _L7
_L7:
        int i = Net.getInterface4(fd);
        if(i != 0) goto _L10; else goto _L9
_L9:
        null;
        obj;
        JVM INSTR monitorexit ;
        return;
_L10:
        NetworkInterface networkinterface1;
        InetAddress inetaddress = Net.inet4FromInt(i);
        networkinterface1 = NetworkInterface.getByInetAddress(inetaddress);
        if(networkinterface1 == null)
            throw new IOException("Unable to map address to interface");
        networkinterface1;
        obj;
        JVM INSTR monitorexit ;
        return;
_L8:
        i = Net.getInterface6(fd);
        if(i != 0) goto _L12; else goto _L11
_L11:
        null;
        obj;
        JVM INSTR monitorexit ;
        return;
_L12:
        NetworkInterface networkinterface;
        networkinterface = NetworkInterface.getByIndex(i);
        if(networkinterface == null)
            throw new IOException("Unable to map index to interface");
        networkinterface;
        obj;
        JVM INSTR monitorexit ;
        return;
_L6:
        Net.getSocketOption(fd, Net.UNSPEC, socketoption);
        obj;
        JVM INSTR monitorexit ;
        return;
        Exception exception;
        exception;
        throw exception;
    }

总结;
关闭通道实际完成的工作为更新系统报文socket计数器,即自减1;注册器不为null,则使注册器中的所有多播组无效;通知本地读写线程,通道已关闭;委托报文分发器DatagramDispatcher关闭文件描述。

附:
//DatagramSocketAdaptor

DatagramChannelImpl 解析四(地址绑定,关闭通道等)
            
    
    博客分类: NIO nio 


DatagramChannelImpl 解析四(地址绑定,关闭通道等)
            
    
    博客分类: NIO nio 


//DatagramDispatcher
class DatagramDispatcher extends NativeDispatcher
{
        static 
    {
        Util.load();
    }
    DatagramDispatcher()
    {
    }
    int read(FileDescriptor filedescriptor, long l, int i)
        throws IOException
    {
        return read0(filedescriptor, l, i);
    }
    long readv(FileDescriptor filedescriptor, long l, int i)
        throws IOException
    {
        return readv0(filedescriptor, l, i);
    }
    int write(FileDescriptor filedescriptor, long l, int i)
        throws IOException
    {
        return write0(filedescriptor, l, i);
    }
    long writev(FileDescriptor filedescriptor, long l, int i)
        throws IOException
    {
        return writev0(filedescriptor, l, i);
    }
    void close(FileDescriptor filedescriptor)
        throws IOException
    {
        SocketDispatcher.close0(filedescriptor);
    }
    static native int read0(FileDescriptor filedescriptor, long l, int i)
        throws IOException;
    static native long readv0(FileDescriptor filedescriptor, long l, int i)
        throws IOException;
    static native int write0(FileDescriptor filedescriptor, long l, int i)
        throws IOException;
    static native long writev0(FileDescriptor filedescriptor, long l, int i)
        throws IOException;
}
  • DatagramChannelImpl 解析四(地址绑定,关闭通道等)
            
    
    博客分类: NIO nio 
  • 大小: 82.5 KB
  • DatagramChannelImpl 解析四(地址绑定,关闭通道等)
            
    
    博客分类: NIO nio 
  • 大小: 20 KB
相关标签: nio