DatagramChannelImpl 解析二(报文发送与接收)
程序员文章站
2022-07-13 16:59:42
...
DatagramChannelImpl 解析一(初始化):http://donald-draper.iteye.com/blog/2373245
引言:
DatagramChannelImpl主要成员有报文socket分发器,这个与SocketChannleImpl中的socket分发器原理基本相同,报文socket分发器可以理解为报文通道的静态代理;网络协议family表示当前报文通道的网络协议family;多播关系注册器MembershipRegistry,主要是通过一个Map-HashMap<InetAddress,LinkedList<MembershipKeyImpl>>来管理多播组和多播组成员关系key的映射(关系);通道本地读写线程记录器,及读写锁控制通道读写,一个状态锁,当通道状态改变时,需要获取状态锁。DatagramChannelImpl构造方法,主要是初始化读写线程,及读写锁和状态锁,初始化网络协议family,及报文通道描述符和文件描述id。DatagramChannelImpl(SelectorProvider selectorprovider)与其他两个不同的是构造时更新当前报文socket的数量。
今天这篇我们来看报文通道的具体实现我们需要关注的方法drop,block,unblock,join,
send,receive,read和write。
我们先来看一下send方法
send(ByteBuffer bytebuffer, SocketAddress socketaddress)方法,我们
需要关注的是一下几点:
1.
2.
securitymanager.checkMulticast(inetaddress);
3.
这个之所以是循环为了,当发送报文操作因某种原因中断,但报文没有发送完,当中断位消除时,
继续发送报文。
我们来看send(FileDescriptor filedescriptor, ByteBuffer bytebuffer, InetSocketAddress inetsocketaddress)
再来看sendFromNativeBuffer方法
从上面来看send方法,首先同步写锁,确保通道打开,然后检查地址,如果系统安全管理器不为null,则更具地址类型检查相应的权限,如果地址为多播地址,则检查多播权限,否则检查连接到socketaddress的权限;如果发送的buffer为direct类型,则直接发送,否则从当前线程缓冲区获取一个临时DirectByteBuffer,并将buffer中的数据写到临时DirectByteBuffer中,然后发送,发送后,释放临时DirectByteBuffer,即添加到当前线程缓存区以便重用。
再来看receive方法:
接收报文方法,有以下几点要关注
1.
2.
这个之所以是循环为了,当接收报文操作因某种原因中断,但报文没有读取完,当中断位消除时,继续读取报文。
receive(接收报文)方法,首先同步读锁,确保通道打开,如果本地地址为null,则绑定local地址,并初始化报文通道的localAddress;获取buffer当前可用空间remaining,如果buffer为direct类型,则直接接收报文,否则,从当前线程缓冲区获取临时DirectByteBuffer,接收报文,写到临时缓冲区临时DirectByteBuffer,读取临时DirectByteBuffer,写到buffer中,释放临时DirectByteBuffer,即添加DirectByteBuffer到当前线程缓存区,以便重用。
发送报文和接受报文方法看完,我们来看一下需要通道建立连接,才能进行使用的读写操作方法:先来看写操作,读取buffer,写到输出流
再来看读写buffer组,写到输出流
再来读操作,从输入流读取报文,写到buffer中
从输入流读取报文,写到buffer数组中,
read和write形式的方法关键的读写操作都是委托给IOUtil来完成,这个与SocketChannelImpl中读写操作基本思路相同,这里就不再介绍。
总结:
send(发送报文)方法,首先同步写锁,确保通道打开,然后检查地址,如果系统安全管理器不为null,则更具地址类型检查相应的权限,如果地址为多播地址,则检查多播权限,否则检查连接到socketaddress的权限;如果发送的buffer为direct类型,则直接发送,否则从当前线程缓冲区获取一个临时DirectByteBuffer,并将buffer中的数据写到临时DirectByteBuffer中,然后发送,发送后,释放临时DirectByteBuffer,即添加到当前线程缓存区以便重用。
receive(接收报文)方法,首先同步读锁,确保通道打开,如果本地地址为null,则绑定local地址,并初始化报文通道的localAddress;获取buffer当前可用空间remaining,如果buffer为direct类型,则直接接收报文,否则,从当前线程缓冲区获取临时DirectByteBuffer,接收报文,写到临时缓冲区临时DirectByteBuffer,读取临时DirectByteBuffer,写到buffer中,释放临时DirectByteBuffer,即添加DirectByteBuffer到当前线程缓存区,以便重用。
send(发送报文)和receive(接收报文)方法不需要通道已经处于连接状态,而read和write需要通道建立连接状态,这种方式与SocketChannel的读写操作相同,这样与SocketChannel无异,如果需要不如使用SocketChannel。如果使用DatagramChannel,建议使用send和recieve方法进行报文的发送和接收。
DatagramChannelImpl 解析三(多播):http://donald-draper.iteye.com/blog/2373507
引言:
DatagramChannelImpl主要成员有报文socket分发器,这个与SocketChannleImpl中的socket分发器原理基本相同,报文socket分发器可以理解为报文通道的静态代理;网络协议family表示当前报文通道的网络协议family;多播关系注册器MembershipRegistry,主要是通过一个Map-HashMap<InetAddress,LinkedList<MembershipKeyImpl>>来管理多播组和多播组成员关系key的映射(关系);通道本地读写线程记录器,及读写锁控制通道读写,一个状态锁,当通道状态改变时,需要获取状态锁。DatagramChannelImpl构造方法,主要是初始化读写线程,及读写锁和状态锁,初始化网络协议family,及报文通道描述符和文件描述id。DatagramChannelImpl(SelectorProvider selectorprovider)与其他两个不同的是构造时更新当前报文socket的数量。
今天这篇我们来看报文通道的具体实现我们需要关注的方法drop,block,unblock,join,
send,receive,read和write。
我们先来看一下send方法
//发送报文到指定的socketaddress public int send(ByteBuffer bytebuffer, SocketAddress socketaddress) throws IOException //buffer为null,则抛出空指针异常 if(bytebuffer == null) throw new NullPointerException(); Object obj = writeLock;//同步写锁 JVM INSTR monitorenter ;//进入同步,try InetSocketAddress inetsocketaddress; InetAddress inetaddress; ensureOpen();//确保通道打开 //检查socketaddress inetsocketaddress = Net.checkAddress(socketaddress); inetaddress = inetsocketaddress.getAddress(); if(inetaddress == null) throw new IOException("Target address not resolved"); Object obj1 = stateLock;//同步状态锁 JVM INSTR monitorenter ;//try if(!isConnected()) { if(socketaddress == null) throw new NullPointerException(); SecurityManager securitymanager = System.getSecurityManager(); if(securitymanager != null) //如果地址为多播地址,则检查是否具有多播权限 if(inetaddress.isMulticastAddress()) securitymanager.checkMulticast(inetaddress); else //否则检查是否具有连接inetaddress.getHostAddress()和相应端口的权限 securitymanager.checkConnect(inetaddress.getHostAddress(), inetsocketaddress.getPort()); break MISSING_BLOCK_LABEL_156; } //socketaddress不为报文socket的远端地址,则抛出IllegalArgumentException if(!socketaddress.equals(remoteAddress)) throw new IllegalArgumentException("Connected address not equal to target address"); ... int i = 0; int j; begin();//与end方法协同,记录中断器,控制中断。 ... //获取本地写线程 writerThread = NativeThread.current(); do //委托给send(FileDescriptor filedescriptor, ByteBuffer bytebuffer, InetSocketAddress inetsocketaddress) i = send(fd, bytebuffer, inetsocketaddress); while(i == -3 && isOpen()); ... }
send(ByteBuffer bytebuffer, SocketAddress socketaddress)方法,我们
需要关注的是一下几点:
1.
ensureOpen();//确保通道打开
private void ensureOpen() throws ClosedChannelException { if(!isOpen()) throw new ClosedChannelException(); else return; }
2.
//如果地址为多播地址,则检查是否具有多播权限 if(inetaddress.isMulticastAddress())
securitymanager.checkMulticast(inetaddress);
//SecurityManager,检查多播权限 public void checkMulticast(InetAddress maddr) { String host = maddr.getHostAddress(); if (!host.startsWith("[") && host.indexOf(':') != -1) { host = "[" + host + "]"; } checkPermission(new SocketPermission(host, SecurityConstants.SOCKET_CONNECT_ACCEPT_ACTION)); }
//SecurityConstants public static final String SOCKET_CONNECT_ACCEPT_ACTION = "connect,accept";
3.
do //委托给send(FileDescriptor filedescriptor, ByteBuffer bytebuffer, InetSocketAddress inetsocketaddress) i = send(fd, bytebuffer, inetsocketaddress); while(i == -3 && isOpen());
这个之所以是循环为了,当发送报文操作因某种原因中断,但报文没有发送完,当中断位消除时,
继续发送报文。
我们来看send(FileDescriptor filedescriptor, ByteBuffer bytebuffer, InetSocketAddress inetsocketaddress)
private int send(FileDescriptor filedescriptor, ByteBuffer bytebuffer, InetSocketAddress inetsocketaddress) throws IOException { int i; ByteBuffer bytebuffer1; //如果buffer为direct类型,则直接调用sendFromNativeBuffer if(bytebuffer instanceof DirectBuffer) return sendFromNativeBuffer(filedescriptor, bytebuffer, inetsocketaddress); //获取当前buffer的实际容量,即remaining i = bytebuffer.position(); int j = bytebuffer.limit(); if(!$assertionsDisabled && i > j) throw new AssertionError(); int k = i > j ? 0 : j - i;//remaining //从当前线程缓存区获取临时DirectByteBuffer bytebuffer1 = Util.getTemporaryDirectBuffer(k); int i1; //读取buffer字节序列,写到临时DirectByteBuffer中 bytebuffer1.put(bytebuffer); //读写模式转换,以便发送报文 bytebuffer1.flip(); bytebuffer.position(i);//重新定位buffer的position为原始位置,以便一次发送不完,再次发送 //委托个sendFromNativeBuffer,发送已写的字节数 int l = sendFromNativeBuffer(filedescriptor, bytebuffer1, inetsocketaddress); if(l > 0) //buffer的position向前移动i个位置 bytebuffer.position(i + l); i1 = l; //添加DirectByteBuffer到当前线程缓存区,以便重用,因为DirectByteBuffer是直接操作系统内存 //频繁的分配内存,将消耗过多的系统资源。 Util.releaseTemporaryDirectBuffer(bytebuffer1); return i1; Exception exception; exception; Util.releaseTemporaryDirectBuffer(bytebuffer1); throw exception; }
再来看sendFromNativeBuffer方法
private int sendFromNativeBuffer(FileDescriptor filedescriptor, ByteBuffer bytebuffer, InetSocketAddress inetsocketaddress) throws IOException { //获取当前buffer的实际容量,即remaining int i = bytebuffer.position(); int j = bytebuffer.limit(); if(!$assertionsDisabled && i > j) throw new AssertionError(); int k = i > j ? 0 : j - i;//remaining boolean flag = family != StandardProtocolFamily.INET; int l; try { //委托个send0,返回已发送的字节数 l = send0(flag, filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)i, k, inetsocketaddress.getAddress(), inetsocketaddress.getPort()); } catch(PortUnreachableException portunreachableexception) { if(isConnected()) throw portunreachableexception; l = k; } if(l > 0) //buffer的position向前移动i个位置 bytebuffer.position(i + l); return l; }
private native int send0(boolean flag, FileDescriptor filedescriptor, long l, int i, InetAddress inetaddress, int j) throws IOException;
从上面来看send方法,首先同步写锁,确保通道打开,然后检查地址,如果系统安全管理器不为null,则更具地址类型检查相应的权限,如果地址为多播地址,则检查多播权限,否则检查连接到socketaddress的权限;如果发送的buffer为direct类型,则直接发送,否则从当前线程缓冲区获取一个临时DirectByteBuffer,并将buffer中的数据写到临时DirectByteBuffer中,然后发送,发送后,释放临时DirectByteBuffer,即添加到当前线程缓存区以便重用。
再来看receive方法:
public SocketAddress receive(ByteBuffer bytebuffer) throws IOException { //如果buffer为只读,则抛出IllegalArgumentException if(bytebuffer.isReadOnly()) throw new IllegalArgumentException("Read-only buffer"); if(bytebuffer == null) throw new NullPointerException(); Object obj = readLock;//同步读锁 JVM INSTR monitorenter ; int i; ByteBuffer bytebuffer1; ensureOpen();//确保通道打开 //如果本地地址为null,则绑定空地址 if(localAddress() == null) bind(null); i = 0; bytebuffer1 = null; Object obj1; begin();//与end方法协同,记录中断器,控制中断。 ... readerThread = NativeThread.current(); if(!isConnected() && obj1 != null) break MISSING_BLOCK_LABEL_248; do //读取报文,写到buffer i = receive(fd, bytebuffer); while(i == -3 && isOpen()); ... }
接收报文方法,有以下几点要关注
1.
//如果本地地址为null,则绑定空地址 if(localAddress() == null) bind(null);
//获取通道本地socket地址 public SocketAddress localAddress() { Object obj = stateLock; JVM INSTR monitorenter ; return localAddress; Exception exception; exception; throw exception; }
//通道绑定socket地址 public volatile NetworkChannel bind(SocketAddress socketaddress) throws IOException { return bind(socketaddress); }
public DatagramChannel bind(SocketAddress socketaddress) throws IOException { synchronized(readLock) { synchronized(writeLock) { synchronized(stateLock) { //同步读写锁,及状态锁 ensureOpen(); if(localAddress != null) throw new AlreadyBoundException(); InetSocketAddress inetsocketaddress; if(socketaddress == null) { //地址为空,则获取本地地址 if(family == StandardProtocolFamily.INET) inetsocketaddress = new InetSocketAddress(InetAddress.getByName("0.0.0.0"), 0); else inetsocketaddress = new InetSocketAddress(0); } else { inetsocketaddress = Net.checkAddress(socketaddress); if(family == StandardProtocolFamily.INET) { InetAddress inetaddress = inetsocketaddress.getAddress(); if(!(inetaddress instanceof Inet4Address)) throw new UnsupportedAddressTypeException(); } } SecurityManager securitymanager = System.getSecurityManager(); if(securitymanager != null) //检查地址端口监听权限 securitymanager.checkListen(inetsocketaddress.getPort()); //委托net完成实际的绑定工作 Net.bind(family, fd, inetsocketaddress.getAddress(), inetsocketaddress.getPort()); //初始化本地地址 localAddress = Net.localAddress(fd); } } } return this; }
2.
do //读取报文,写到buffer i = receive(fd, bytebuffer); while(i == -3 && isOpen());
这个之所以是循环为了,当接收报文操作因某种原因中断,但报文没有读取完,当中断位消除时,继续读取报文。
private int receive(FileDescriptor filedescriptor, ByteBuffer bytebuffer) throws IOException { int k; int l; ByteBuffer bytebuffer1; //获取buffer当前可用空间remaining int i = bytebuffer.position(); int j = bytebuffer.limit(); if(!$assertionsDisabled && i > j) throw new AssertionError(); k = i > j ? 0 : j - i;//remaining //如果buffer为direct类型,则直接接收报文 if((bytebuffer instanceof DirectBuffer) && k > 0) return receiveIntoNativeBuffer(filedescriptor, bytebuffer, k, i); l = Math.max(k, 1); //从当前线程缓冲区获取临时DirectByteBuffer bytebuffer1 = Util.getTemporaryDirectBuffer(l); int j1; //接收报文 int i1 = receiveIntoNativeBuffer(filedescriptor, bytebuffer1, l, 0); //切换读写模式 bytebuffer1.flip(); if(i1 > 0 && k > 0) //读取临时DirectByteBuffer,写到buffer中 bytebuffer.put(bytebuffer1); j1 = i1; //释放临时DirectByteBuffer,即,添加DirectByteBuffer到当前线程缓存区,以便重用 Util.releaseTemporaryDirectBuffer(bytebuffer1); return j1; Exception exception; exception; Util.releaseTemporaryDirectBuffer(bytebuffer1); throw exception; }
private int receiveIntoNativeBuffer(FileDescriptor filedescriptor, ByteBuffer bytebuffer, int i, int j) throws IOException { //读取报文,写到buffer中 int k = receive0(filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)j, i, isConnected()); if(k > 0) //buffer的position向前移动k个位置 bytebuffer.position(j + k); return k; }
private native int receive0(FileDescriptor filedescriptor, long l, int i, boolean flag) throws IOException;
receive(接收报文)方法,首先同步读锁,确保通道打开,如果本地地址为null,则绑定local地址,并初始化报文通道的localAddress;获取buffer当前可用空间remaining,如果buffer为direct类型,则直接接收报文,否则,从当前线程缓冲区获取临时DirectByteBuffer,接收报文,写到临时缓冲区临时DirectByteBuffer,读取临时DirectByteBuffer,写到buffer中,释放临时DirectByteBuffer,即添加DirectByteBuffer到当前线程缓存区,以便重用。
发送报文和接受报文方法看完,我们来看一下需要通道建立连接,才能进行使用的读写操作方法:先来看写操作,读取buffer,写到输出流
public int write(ByteBuffer bytebuffer) throws IOException { if(bytebuffer == null) throw new NullPointerException(); Object obj = writeLock; JVM INSTR monitorenter ; int i; synchronized(stateLock) { ensureOpen(); if(!isConnected()) throw new NotYetConnectedException(); } i = 0; int j; begin(); if(isOpen()) break MISSING_BLOCK_LABEL_123; j = 0; writerThread = 0L; end(i > 0 || i == -2); if(!$assertionsDisabled && !IOStatus.check(i)) throw new AssertionError(); return j; writerThread = NativeThread.current(); do //关键在这,委托给IOUtil i = IOUtil.write(fd, bytebuffer, -1L, nd, writeLock); while(i == -3 && isOpen()); ... }
再来看读写buffer组,写到输出流
public long write(ByteBuffer abytebuffer[], int i, int j) throws IOException { ... writerThread = NativeThread.current(); do //关键在这,委托给IOUtil l = IOUtil.write(fd, abytebuffer, i, j, nd); while(l == -3L && isOpen()); ... }
再来读操作,从输入流读取报文,写到buffer中
public int read(ByteBuffer bytebuffer) throws IOException { if(bytebuffer == null) throw new NullPointerException(); Object obj = readLock; JVM INSTR monitorenter ; int i; synchronized(stateLock) { ensureOpen(); if(!isConnected()) throw new NotYetConnectedException(); } i = 0; int j; begin(); if(isOpen()) break MISSING_BLOCK_LABEL_123; j = 0; readerThread = 0L; end(i > 0 || i == -2); if(!$assertionsDisabled && !IOStatus.check(i)) throw new AssertionError(); return j; readerThread = NativeThread.current(); do //关键点在这,委托给IOUtil i = IOUtil.read(fd, bytebuffer, -1L, nd, readLock); while(i == -3 && isOpen()); ... }
从输入流读取报文,写到buffer数组中,
public long read(ByteBuffer abytebuffer[], int i, int j) throws IOException { ... readerThread = NativeThread.current(); do //关键点在这,委托给IOUtil l = IOUtil.read(fd, abytebuffer, i, j, nd); while(l == -3L && isOpen()); ... }
read和write形式的方法关键的读写操作都是委托给IOUtil来完成,这个与SocketChannelImpl中读写操作基本思路相同,这里就不再介绍。
总结:
send(发送报文)方法,首先同步写锁,确保通道打开,然后检查地址,如果系统安全管理器不为null,则更具地址类型检查相应的权限,如果地址为多播地址,则检查多播权限,否则检查连接到socketaddress的权限;如果发送的buffer为direct类型,则直接发送,否则从当前线程缓冲区获取一个临时DirectByteBuffer,并将buffer中的数据写到临时DirectByteBuffer中,然后发送,发送后,释放临时DirectByteBuffer,即添加到当前线程缓存区以便重用。
receive(接收报文)方法,首先同步读锁,确保通道打开,如果本地地址为null,则绑定local地址,并初始化报文通道的localAddress;获取buffer当前可用空间remaining,如果buffer为direct类型,则直接接收报文,否则,从当前线程缓冲区获取临时DirectByteBuffer,接收报文,写到临时缓冲区临时DirectByteBuffer,读取临时DirectByteBuffer,写到buffer中,释放临时DirectByteBuffer,即添加DirectByteBuffer到当前线程缓存区,以便重用。
send(发送报文)和receive(接收报文)方法不需要通道已经处于连接状态,而read和write需要通道建立连接状态,这种方式与SocketChannel的读写操作相同,这样与SocketChannel无异,如果需要不如使用SocketChannel。如果使用DatagramChannel,建议使用send和recieve方法进行报文的发送和接收。
DatagramChannelImpl 解析三(多播):http://donald-draper.iteye.com/blog/2373507
上一篇: DatagramChannelImpl 解析四(地址绑定,关闭通道等)
下一篇: NIO-UDP实例