DatagramChannelImpl 解析四(地址绑定,关闭通道等)
程序员文章站
2022-07-13 16:59:48
...
DatagramChannelImpl 解析一(初始化):http://donald-draper.iteye.com/blog/2373245
DatagramChannelImpl 解析二(报文发送与接收):http://donald-draper.iteye.com/blog/2373281
DatagramChannelImpl 解析三(多播):http://donald-draper.iteye.com/blog/2373507
引言:
前面一篇文章我们看了报文通道加入多播组,阻塞和解除阻塞源地址报文等方法,先来回顾一下,
join(报文通道加入多播组)方法,首先检查加入的多播组地址是否正确,然后校验源地址,检查多播成员关系注册器中是否存在多播地址为inetaddress,网络接口为networkinterface,源地址为inetaddress1的多播成员关系key,有则直接返回,否则根据网络协议族family,网络接口,源地址构造多播成员关系MembershipKeyImpl,添加到注册器MembershipRegistry。
阻塞源地址报文与解除源地址报文阻塞,首先检查源地址,再将实际的阻塞与解除阻塞工作委托给Net完成。
drop方法,首先判断多播成员关系key是否有效,如果有效,判断多播组为ip4还是ip6,然后委托给Net完成实际的drop工作。
今天来看报文通道的其他方法,
先来看地址绑定
再来看连接操作
再来看断开连接方法
再来看配置通道阻塞模式
再来看关闭通道方法
上面有两点要关注
1.
//NativeDispatcher
2.
//DatagramDispatcher
从上面可以看出,关闭通道实际完成的工作为更新系统报文socket计数器,即自减1;
注册器不为null,则使注册器中的所有多播组无效;通知本地读写线程,通道已关闭;
委托报文分发器DatagramDispatcher关闭文件描述。
再来看其他方法,
//获取通道报文socket
//DatagramSocketAdaptor,可以简单理解为报文通道的静态代理。
从上面可以看出,获取通道报文socket,实际上返回的报文通道适配器DatagramSocketAdaptor,可以简单理解为报文通道的静态代理。
总结;
关闭通道实际完成的工作为更新系统报文socket计数器,即自减1;注册器不为null,则使注册器中的所有多播组无效;通知本地读写线程,通道已关闭;委托报文分发器DatagramDispatcher关闭文件描述。
附:
//DatagramSocketAdaptor
//DatagramDispatcher
DatagramChannelImpl 解析二(报文发送与接收):http://donald-draper.iteye.com/blog/2373281
DatagramChannelImpl 解析三(多播):http://donald-draper.iteye.com/blog/2373507
引言:
前面一篇文章我们看了报文通道加入多播组,阻塞和解除阻塞源地址报文等方法,先来回顾一下,
join(报文通道加入多播组)方法,首先检查加入的多播组地址是否正确,然后校验源地址,检查多播成员关系注册器中是否存在多播地址为inetaddress,网络接口为networkinterface,源地址为inetaddress1的多播成员关系key,有则直接返回,否则根据网络协议族family,网络接口,源地址构造多播成员关系MembershipKeyImpl,添加到注册器MembershipRegistry。
阻塞源地址报文与解除源地址报文阻塞,首先检查源地址,再将实际的阻塞与解除阻塞工作委托给Net完成。
drop方法,首先判断多播成员关系key是否有效,如果有效,判断多播组为ip4还是ip6,然后委托给Net完成实际的drop工作。
今天来看报文通道的其他方法,
先来看地址绑定
public volatile NetworkChannel bind(SocketAddress socketaddress) throws IOException { return bind(socketaddress); } public DatagramChannel bind(SocketAddress socketaddress) throws IOException { synchronized(readLock) { synchronized(writeLock) { synchronized(stateLock) { //同步读写锁,及状态锁,确保通道打开 ensureOpen(); //如果本地地址不为null,则已绑定 if(localAddress != null) throw new AlreadyBoundException(); InetSocketAddress inetsocketaddress; if(socketaddress == null) { //如果绑定的socket地址为null,则创建统配地址为绑定地址 if(family == StandardProtocolFamily.INET) inetsocketaddress = new InetSocketAddress(InetAddress.getByName("0.0.0.0"), 0); else inetsocketaddress = new InetSocketAddress(0); } else { //否则检查socket地址 inetsocketaddress = Net.checkAddress(socketaddress); if(family == StandardProtocolFamily.INET) { InetAddress inetaddress = inetsocketaddress.getAddress(); if(!(inetaddress instanceof Inet4Address)) throw new UnsupportedAddressTypeException(); } } SecurityManager securitymanager = System.getSecurityManager(); if(securitymanager != null) //检查socket端口监听权限 securitymanager.checkListen(inetsocketaddress.getPort()); //委托给Net Net.bind(family, fd, inetsocketaddress.getAddress(), inetsocketaddress.getPort()); //初始化本地地址 localAddress = Net.localAddress(fd); } } } return this; }
再来看连接操作
public DatagramChannel connect(SocketAddress socketaddress) throws IOException { boolean flag = false; synchronized(readLock) { synchronized(writeLock) { synchronized(stateLock) { //同步读写及状态锁,确保通道打开,未建立连接 ensureOpenAndUnconnected(); InetSocketAddress inetsocketaddress = Net.checkAddress(socketaddress); SecurityManager securitymanager = System.getSecurityManager(); if(securitymanager != null) //检查连接socket地址权限 securitymanager.checkConnect(inetsocketaddress.getAddress().getHostAddress(), inetsocketaddress.getPort()); int i = Net.connect(family, fd, inetsocketaddress.getAddress(), inetsocketaddress.getPort()); if(i <= 0) throw new Error(); state = 1;//已来你就饿 //初始化远端地址 remoteAddress = socketaddress; sender = inetsocketaddress;//初始化发送者地址 //缓存发送者地址及端口 cachedSenderInetAddress = inetsocketaddress.getAddress(); cachedSenderPort = inetsocketaddress.getPort(); //根据文件描述符获取本地地址 localAddress = Net.localAddress(fd); } } } return this; }
再来看断开连接方法
public DatagramChannel disconnect() throws IOException { Object obj = readLock; JVM INSTR monitorenter ; Object obj1 = writeLock; JVM INSTR monitorenter ; Object obj2 = stateLock; JVM INSTR monitorenter ; //同步读写及状态锁 //确保处于连接状态或通道打开 if(!isConnected() || !isOpen()) return this; //获取远端地址 InetSocketAddress inetsocketaddress = (InetSocketAddress)remoteAddress; SecurityManager securitymanager = System.getSecurityManager(); if(securitymanager != null) //检查连接远端地址权限 securitymanager.checkConnect(inetsocketaddress.getAddress().getHostAddress(), inetsocketaddress.getPort()); //完成实际断开连接操作 disconnect0(fd); remoteAddress = null; state = 0;//未连接 localAddress = Net.localAddress(fd); ... return this; } private static native void disconnect0(FileDescriptor filedescriptor) throws IOException;
再来看配置通道阻塞模式
protected void implConfigureBlocking(boolean flag) throws IOException { //委托给IOUtil IOUtil.configureBlocking(fd, flag); }
再来看关闭通道方法
protected void implCloseSelectableChannel() throws IOException { synchronized(stateLock) { if(state != 2) //如果通道处于非关闭状态,则委托给报文分发器预先关闭文件描述 nd.preClose(fd); //更新报文socket计数器,自减1 ResourceManager.afterUdpClose(); if(registry != null) //注册器不为null,则使注册器中的所有多播组无效 registry.invalidateAll(); long l; //通知本地读写线程 if((l = readerThread) != 0L) NativeThread.signal(l); if((l = writerThread) != 0L) NativeThread.signal(l); if(!isRegistered()) //如果通道当前没有注册到任何选择器,则kill,完整实际的关闭工作 kill(); } }
上面有两点要关注
1.
if(state != 2) //如果通道处于非关闭状态,则委托给报文分发器预先关闭文件描述 nd.preClose(fd);
//NativeDispatcher
void preClose(FileDescriptor filedescriptor) throws IOException { }
2.
if(!isRegistered()) //如果通道当前没有注册到任何选择器,则kill,完整实际的关闭工作 kill();
public void kill() throws IOException { label0: { synchronized(stateLock) { if(state != 2) //如果状态为非关闭,则跳到label0 break label0; } return; } if(state != -1) break MISSING_BLOCK_LABEL_34; state = 2; obj; JVM INSTR monitorexit ; return; if(!$assertionsDisabled && (isOpen() || isRegistered())) throw new AssertionError(); //关闭文件描述 nd.close(fd); state = 2; obj; JVM INSTR monitorexit ; goto _L1 exception; throw exception; _L1: }
//DatagramDispatcher
void close(FileDescriptor filedescriptor) throws IOException { SocketDispatcher.close0(filedescriptor); }
从上面可以看出,关闭通道实际完成的工作为更新系统报文socket计数器,即自减1;
注册器不为null,则使注册器中的所有多播组无效;通知本地读写线程,通道已关闭;
委托报文分发器DatagramDispatcher关闭文件描述。
再来看其他方法,
//获取本地地址 public SocketAddress getLocalAddress() throws IOException { Object obj = stateLock; JVM INSTR monitorenter ; if(!isOpen()) throw new ClosedChannelException(); return localAddress; Exception exception; exception; throw exception; } //获取远端地址 public SocketAddress getRemoteAddress() throws IOException { Object obj = stateLock; JVM INSTR monitorenter ; if(!isOpen()) throw new ClosedChannelException(); return remoteAddress; Exception exception; exception; throw exception; }
//获取通道报文socket
public DatagramSocket socket() { Object obj = stateLock; JVM INSTR monitorenter ; if(socket == null) //委托给DatagramSocketAdaptor,根据通道创建报文socket socket = DatagramSocketAdaptor.create(this); return socket; Exception exception; exception; throw exception; }
//DatagramSocketAdaptor,可以简单理解为报文通道的静态代理。
public class DatagramSocketAdaptor extends DatagramSocket { private final DatagramChannelImpl dc;//报文通道 private volatile int timeout; private static final DatagramSocketImpl dummyDatagramSocket = new DatagramSocketImpl() { protected void create() throws SocketException { } protected void bind(int i, InetAddress inetaddress) throws SocketException { } protected void send(DatagramPacket datagrampacket) throws IOException { } protected int peek(InetAddress inetaddress) throws IOException { return 0; } protected int peekData(DatagramPacket datagrampacket) throws IOException { return 0; } protected void receive(DatagramPacket datagrampacket) throws IOException { } protected void setTTL(byte byte0) throws IOException { } protected byte getTTL() throws IOException { return 0; } protected void setTimeToLive(int i) throws IOException { } protected int getTimeToLive() throws IOException { return 0; } protected void join(InetAddress inetaddress) throws IOException { } protected void leave(InetAddress inetaddress) throws IOException { } protected void joinGroup(SocketAddress socketaddress, NetworkInterface networkinterface) throws IOException { } protected void leaveGroup(SocketAddress socketaddress, NetworkInterface networkinterface) throws IOException { } protected void close() { } public Object getOption(int i) throws SocketException { return null; } public void setOption(int i, Object obj) throws SocketException { } }; } //构造报文socket适配器 private DatagramSocketAdaptor(DatagramChannelImpl datagramchannelimpl) throws IOException { super(dummyDatagramSocket); timeout = 0; dc = datagramchannelimpl; } public void bind(SocketAddress socketaddress) throws SocketException { try { if(socketaddress == null) socketaddress = new InetSocketAddress(0); //委托为报文通道 dc.bind(socketaddress); } catch(Exception exception) { Net.translateToSocketException(exception); } } public void close() { try { //委托为报文通道 dc.close(); } catch(IOException ioexception) { throw new Error(ioexception); } } ... }
从上面可以看出,获取通道报文socket,实际上返回的报文通道适配器DatagramSocketAdaptor,可以简单理解为报文通道的静态代理。
//确保通道打开 private void ensureOpen() throws ClosedChannelException { if(!isOpen()) throw new ClosedChannelException(); else return; } //通道是否连接 public boolean isConnected() { Object obj = stateLock; JVM INSTR monitorenter ; return state == 1; Exception exception; exception; throw exception; } //确保通道打开,且未连接 void ensureOpenAndUnconnected() throws IOException { synchronized(stateLock) { if(!isOpen()) throw new ClosedChannelException(); if(state != 0) throw new IllegalStateException("Connect already invoked"); } } //finalize protected void finalize() throws IOException { if(fd != null) close(); } //设置就绪事件 public boolean translateAndSetReadyOps(int i, SelectionKeyImpl selectionkeyimpl) { return translateReadyOps(i, 0, selectionkeyimpl); } //更新就绪事件 public boolean translateAndUpdateReadyOps(int i, SelectionKeyImpl selectionkeyimpl) { return translateReadyOps(i, selectionkeyimpl.nioReadyOps(), selectionkeyimpl); } public boolean translateReadyOps(int i, int j, SelectionKeyImpl selectionkeyimpl) { int k = selectionkeyimpl.nioInterestOps(); int l = selectionkeyimpl.nioReadyOps(); int i1 = j; //就绪事件为读1写4连接8,接受连接事件16,不是这四种事件,则返回false if((i & 32) != 0) return false; //下面的这段24,16不是很明白,理解的网友可以给我留言,一起探讨, //莫非为8+16,接受连接,并建立连接 if((i & 24) != 0) { i1 = k; selectionkeyimpl.nioReadyOps(i1); return (i1 & ~l) != 0; } if((i & 1) != 0 && (k & 1) != 0) i1 |= 1;//读事件,已连接 if((i & 4) != 0 && (k & 4) != 0) i1 |= 4;//写事件 selectionkeyimpl.nioReadyOps(i1); return (i1 & ~l) != 0; } //设置通道兴趣事件 public void translateAndSetInterestOps(int i, SelectionKeyImpl selectionkeyimpl) { int j = 0; if((i & 1) != 0) j |= 1;//读事件 if((i & 4) != 0) j |= 4;//写事件 if((i & 8) != 0) j |= 2;//连接事件 selectionkeyimpl.selector.putEventOps(selectionkeyimpl, j); } //通道支持配置项 public final Set supportedOptions() { return DefaultOptionsHolder.defaultOptions; } private static class DefaultOptionsHolder { private DefaultOptionsHolder() { } static final Set defaultOptions = defaultOptions(); private static Set defaultOptions() { HashSet hashset = new HashSet(8); hashset.add(StandardSocketOptions.SO_SNDBUF);//发送缓冲区 hashset.add(StandardSocketOptions.SO_RCVBUF);//接受缓存区 hashset.add(StandardSocketOptions.SO_REUSEADDR);//地址重用 hashset.add(StandardSocketOptions.SO_BROADCAST);//是否支持报文广播传输 hashset.add(StandardSocketOptions.IP_TOS);//网络协议服务类型 hashset.add(StandardSocketOptions.IP_MULTICAST_IF);//多播网络接口 hashset.add(StandardSocketOptions.IP_MULTICAST_TTL);//多播报文存活时间 hashset.add(StandardSocketOptions.IP_MULTICAST_LOOP);//是否支持多播环路地址 return Collections.unmodifiableSet(hashset); } } //配置选项 public DatagramChannel setOption(SocketOption socketoption, Object obj) throws IOException { if(socketoption == null) throw new NullPointerException(); //如果配置为通道非支持配置选项 if(!supportedOptions().contains(socketoption)) throw new UnsupportedOperationException((new StringBuilder()).append("'").append(socketoption).append("' not supported").toString()); Object obj1 = stateLock; JVM INSTR monitorenter ; ensureOpen(); if(socketoption != StandardSocketOptions.IP_TOS) break MISSING_BLOCK_LABEL_102; if(family == StandardProtocolFamily.INET) //委托给Net Net.setSocketOption(fd, family, socketoption, obj); return this; //配置选项非多播报文存活时间,是否支持多播环路地址,调到L2,否则L1 if(socketoption != StandardSocketOptions.IP_MULTICAST_TTL && socketoption != StandardSocketOptions.IP_MULTICAST_LOOP) goto _L2; else goto _L1 _L1: Net.setSocketOption(fd, family, socketoption, obj); this; obj1; JVM INSTR monitorexit ; return; _L2: if(socketoption != StandardSocketOptions.IP_MULTICAST_IF) goto _L4; else goto _L3 _L3: //配置选项为多播网络接口 if(obj == null) throw new IllegalArgumentException("Cannot set IP_MULTICAST_IF to 'null'"); NetworkInterface networkinterface = (NetworkInterface)obj; if(family == StandardProtocolFamily.INET6) { int i = networkinterface.getIndex(); if(i == -1) throw new IOException("Network interface cannot be identified"); //配置文件描述网络接口 Net.setInterface6(fd, i); } else { Inet4Address inet4address = Net.anyInet4Address(networkinterface); if(inet4address == null) throw new IOException("Network interface not configured for IPv4"); int j = Net.inet4AsInt(inet4address); //配置文件描述网络接口 Net.setInterface4(fd, j); } this; obj1; JVM INSTR monitorexit ; return; _L4: Net.setSocketOption(fd, Net.UNSPEC, socketoption, obj); ... } //获取配置项 public Object getOption(SocketOption socketoption) throws IOException { if(socketoption == null) throw new NullPointerException(); //如果配置为通道非支持配置选项 if(!supportedOptions().contains(socketoption)) throw new UnsupportedOperationException((new StringBuilder()).append("'").append(socketoption).append("' not supported").toString()); Object obj = stateLock; JVM INSTR monitorenter ; ensureOpen(); //网络服务类型配置选项,是调到L2,否调到L1 if(socketoption != StandardSocketOptions.IP_TOS) goto _L2; else goto _L1 _L1: if(family == StandardProtocolFamily.INET) //委托给Net,获取选项配置 return Net.getSocketOption(fd, family, socketoption); Integer.valueOf(0); obj; JVM INSTR monitorexit ; return; _L2: //配置选项非多播报文存活时间,是否支持多播环路地址,调到L4,否则L3 if(socketoption != StandardSocketOptions.IP_MULTICAST_TTL && socketoption != StandardSocketOptions.IP_MULTICAST_LOOP) goto _L4; else goto _L3 _L3: //委托给Net,获取选项配置 Net.getSocketOption(fd, family, socketoption); obj; JVM INSTR monitorexit ; return; _L4: //配置选项非网络接口,调到L6,否则L5 if(socketoption != StandardSocketOptions.IP_MULTICAST_IF) goto _L6; else goto _L5 //下面的就不看了,与setOptions中的思路是逆向的 _L5: if(family != StandardProtocolFamily.INET) goto _L8; else goto _L7 _L7: int i = Net.getInterface4(fd); if(i != 0) goto _L10; else goto _L9 _L9: null; obj; JVM INSTR monitorexit ; return; _L10: NetworkInterface networkinterface1; InetAddress inetaddress = Net.inet4FromInt(i); networkinterface1 = NetworkInterface.getByInetAddress(inetaddress); if(networkinterface1 == null) throw new IOException("Unable to map address to interface"); networkinterface1; obj; JVM INSTR monitorexit ; return; _L8: i = Net.getInterface6(fd); if(i != 0) goto _L12; else goto _L11 _L11: null; obj; JVM INSTR monitorexit ; return; _L12: NetworkInterface networkinterface; networkinterface = NetworkInterface.getByIndex(i); if(networkinterface == null) throw new IOException("Unable to map index to interface"); networkinterface; obj; JVM INSTR monitorexit ; return; _L6: Net.getSocketOption(fd, Net.UNSPEC, socketoption); obj; JVM INSTR monitorexit ; return; Exception exception; exception; throw exception; }
总结;
关闭通道实际完成的工作为更新系统报文socket计数器,即自减1;注册器不为null,则使注册器中的所有多播组无效;通知本地读写线程,通道已关闭;委托报文分发器DatagramDispatcher关闭文件描述。
附:
//DatagramSocketAdaptor
//DatagramDispatcher
class DatagramDispatcher extends NativeDispatcher { static { Util.load(); } DatagramDispatcher() { } int read(FileDescriptor filedescriptor, long l, int i) throws IOException { return read0(filedescriptor, l, i); } long readv(FileDescriptor filedescriptor, long l, int i) throws IOException { return readv0(filedescriptor, l, i); } int write(FileDescriptor filedescriptor, long l, int i) throws IOException { return write0(filedescriptor, l, i); } long writev(FileDescriptor filedescriptor, long l, int i) throws IOException { return writev0(filedescriptor, l, i); } void close(FileDescriptor filedescriptor) throws IOException { SocketDispatcher.close0(filedescriptor); } static native int read0(FileDescriptor filedescriptor, long l, int i) throws IOException; static native long readv0(FileDescriptor filedescriptor, long l, int i) throws IOException; static native int write0(FileDescriptor filedescriptor, long l, int i) throws IOException; static native long writev0(FileDescriptor filedescriptor, long l, int i) throws IOException; }