欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

DatagramChannelImpl 解析二(报文发送与接收)

程序员文章站 2022-07-13 16:59:42
...
DatagramChannelImpl 解析一(初始化):http://donald-draper.iteye.com/blog/2373245
引言:
     DatagramChannelImpl主要成员有报文socket分发器,这个与SocketChannleImpl中的socket分发器原理基本相同,报文socket分发器可以理解为报文通道的静态代理;网络协议family表示当前报文通道的网络协议family;多播关系注册器MembershipRegistry,主要是通过一个Map-HashMap<InetAddress,LinkedList<MembershipKeyImpl>>来管理多播组和多播组成员关系key的映射(关系);通道本地读写线程记录器,及读写锁控制通道读写,一个状态锁,当通道状态改变时,需要获取状态锁。DatagramChannelImpl构造方法,主要是初始化读写线程,及读写锁和状态锁,初始化网络协议family,及报文通道描述符和文件描述id。DatagramChannelImpl(SelectorProvider selectorprovider)与其他两个不同的是构造时更新当前报文socket的数量。
    今天这篇我们来看报文通道的具体实现我们需要关注的方法drop,block,unblock,join,
send,receive,read和write。
我们先来看一下send方法
//发送报文到指定的socketaddress
 public int send(ByteBuffer bytebuffer, SocketAddress socketaddress)
   throws IOException
   //buffer为null,则抛出空指针异常
   if(bytebuffer == null)
       throw new NullPointerException();
   Object obj = writeLock;//同步写锁
   JVM INSTR monitorenter ;//进入同步,try
   InetSocketAddress inetsocketaddress;
   InetAddress inetaddress;
   ensureOpen();//确保通道打开
   //检查socketaddress
   inetsocketaddress = Net.checkAddress(socketaddress);
   inetaddress = inetsocketaddress.getAddress();
   if(inetaddress == null)
       throw new IOException("Target address not resolved");
   Object obj1 = stateLock;//同步状态锁
   JVM INSTR monitorenter ;//try
   if(!isConnected())
   {
       if(socketaddress == null)
           throw new NullPointerException();
       SecurityManager securitymanager = System.getSecurityManager();
       if(securitymanager != null)
           //如果地址为多播地址,则检查是否具有多播权限
           if(inetaddress.isMulticastAddress())
               securitymanager.checkMulticast(inetaddress);
           else
	   //否则检查是否具有连接inetaddress.getHostAddress()和相应端口的权限
               securitymanager.checkConnect(inetaddress.getHostAddress(), inetsocketaddress.getPort());
       break MISSING_BLOCK_LABEL_156;
   }
   //socketaddress不为报文socket的远端地址,则抛出IllegalArgumentException
   if(!socketaddress.equals(remoteAddress))
       throw new IllegalArgumentException("Connected address not equal to target address");
   ...
    int i = 0;
    int j;
    begin();//与end方法协同,记录中断器,控制中断。
    ...
    //获取本地写线程
    writerThread = NativeThread.current();
    do
        //委托给send(FileDescriptor filedescriptor, ByteBuffer bytebuffer, InetSocketAddress inetsocketaddress)
        i = send(fd, bytebuffer, inetsocketaddress);
    while(i == -3 && isOpen());
    ...
}

send(ByteBuffer bytebuffer, SocketAddress socketaddress)方法,我们
需要关注的是一下几点:
1.
 
 ensureOpen();//确保通道打开

 private void ensureOpen()
     throws ClosedChannelException
 {
     if(!isOpen())
         throw new ClosedChannelException();
     else
         return;
 }

2.
 //如果地址为多播地址,则检查是否具有多播权限
if(inetaddress.isMulticastAddress())

    securitymanager.checkMulticast(inetaddress);
//SecurityManager,检查多播权限
  public void checkMulticast(InetAddress maddr) {
        String host = maddr.getHostAddress();
        if (!host.startsWith("[") && host.indexOf(':') != -1) {
            host = "[" + host + "]";
        }
        checkPermission(new SocketPermission(host,
            SecurityConstants.SOCKET_CONNECT_ACCEPT_ACTION));
    }

//SecurityConstants
public static final String SOCKET_CONNECT_ACCEPT_ACTION = "connect,accept";

3.
 do
     //委托给send(FileDescriptor filedescriptor, ByteBuffer bytebuffer, InetSocketAddress inetsocketaddress)
     i = send(fd, bytebuffer, inetsocketaddress);
 while(i == -3 && isOpen());

这个之所以是循环为了,当发送报文操作因某种原因中断,但报文没有发送完,当中断位消除时,
继续发送报文。
我们来看send(FileDescriptor filedescriptor, ByteBuffer bytebuffer, InetSocketAddress inetsocketaddress)
private int send(FileDescriptor filedescriptor, ByteBuffer bytebuffer, InetSocketAddress inetsocketaddress)
    throws IOException
{
    int i;
    ByteBuffer bytebuffer1;
    //如果buffer为direct类型,则直接调用sendFromNativeBuffer
    if(bytebuffer instanceof DirectBuffer)
        return sendFromNativeBuffer(filedescriptor, bytebuffer, inetsocketaddress);
    //获取当前buffer的实际容量,即remaining
    i = bytebuffer.position();
    int j = bytebuffer.limit();
    if(!$assertionsDisabled && i > j)
        throw new AssertionError();
    int k = i > j ? 0 : j - i;//remaining
    //从当前线程缓存区获取临时DirectByteBuffer
    bytebuffer1 = Util.getTemporaryDirectBuffer(k);
    int i1;
    //读取buffer字节序列,写到临时DirectByteBuffer中
    bytebuffer1.put(bytebuffer);
    //读写模式转换,以便发送报文
    bytebuffer1.flip();
    bytebuffer.position(i);//重新定位buffer的position为原始位置,以便一次发送不完,再次发送
    //委托个sendFromNativeBuffer,发送已写的字节数
    int l = sendFromNativeBuffer(filedescriptor, bytebuffer1, inetsocketaddress);
    if(l > 0)
        //buffer的position向前移动i个位置
        bytebuffer.position(i + l);
    i1 = l;
    //添加DirectByteBuffer到当前线程缓存区,以便重用,因为DirectByteBuffer是直接操作系统内存
    //频繁的分配内存,将消耗过多的系统资源。
    Util.releaseTemporaryDirectBuffer(bytebuffer1);
    return i1;
    Exception exception;
    exception;
    Util.releaseTemporaryDirectBuffer(bytebuffer1);
    throw exception;
}

再来看sendFromNativeBuffer方法
private int sendFromNativeBuffer(FileDescriptor filedescriptor, ByteBuffer bytebuffer, InetSocketAddress inetsocketaddress)
    throws IOException
{
    //获取当前buffer的实际容量,即remaining
    int i = bytebuffer.position();
    int j = bytebuffer.limit();
    if(!$assertionsDisabled && i > j)
        throw new AssertionError();
    int k = i > j ? 0 : j - i;//remaining
    boolean flag = family != StandardProtocolFamily.INET;
    int l;
    try
    {
        //委托个send0,返回已发送的字节数
        l = send0(flag, filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)i, k, inetsocketaddress.getAddress(), inetsocketaddress.getPort());
    }
    catch(PortUnreachableException portunreachableexception)
    {
        if(isConnected())
            throw portunreachableexception;
        l = k;
    }
    if(l > 0)
         //buffer的position向前移动i个位置
        bytebuffer.position(i + l);
    return l;
}

private native int send0(boolean flag, FileDescriptor filedescriptor, long l, int i, InetAddress inetaddress, int j)
        throws IOException;

从上面来看send方法,首先同步写锁,确保通道打开,然后检查地址,如果系统安全管理器不为null,则更具地址类型检查相应的权限,如果地址为多播地址,则检查多播权限,否则检查连接到socketaddress的权限;如果发送的buffer为direct类型,则直接发送,否则从当前线程缓冲区获取一个临时DirectByteBuffer,并将buffer中的数据写到临时DirectByteBuffer中,然后发送,发送后,释放临时DirectByteBuffer,即添加到当前线程缓存区以便重用。
再来看receive方法:
 public SocketAddress receive(ByteBuffer bytebuffer)
        throws IOException
    {
        //如果buffer为只读,则抛出IllegalArgumentException
        if(bytebuffer.isReadOnly())
            throw new IllegalArgumentException("Read-only buffer");
        if(bytebuffer == null)
            throw new NullPointerException();
        Object obj = readLock;//同步读锁
        JVM INSTR monitorenter ;
        int i;
        ByteBuffer bytebuffer1;
        ensureOpen();//确保通道打开
	//如果本地地址为null,则绑定空地址
        if(localAddress() == null)
            bind(null);
        i = 0;
        bytebuffer1 = null;
        Object obj1;
        begin();//与end方法协同,记录中断器,控制中断。
        ...
        readerThread = NativeThread.current();
        if(!isConnected() && obj1 != null)
            break MISSING_BLOCK_LABEL_248;
        do
	    //读取报文,写到buffer
            i = receive(fd, bytebuffer);
        while(i == -3 && isOpen());
        ...
    }

接收报文方法,有以下几点要关注
1.
//如果本地地址为null,则绑定空地址
if(localAddress() == null)
    bind(null);

//获取通道本地socket地址
 public SocketAddress localAddress()
    {
        Object obj = stateLock;
        JVM INSTR monitorenter ;
        return localAddress;
        Exception exception;
        exception;
        throw exception;
    }

//通道绑定socket地址
  public volatile NetworkChannel bind(SocketAddress socketaddress)
        throws IOException
    {
        return bind(socketaddress);
    }

 public DatagramChannel bind(SocketAddress socketaddress)
        throws IOException
    {
        synchronized(readLock)
        {
            synchronized(writeLock)
            {
                synchronized(stateLock)
                {
                   //同步读写锁,及状态锁
		    ensureOpen();
                    if(localAddress != null)
                        throw new AlreadyBoundException();
                    InetSocketAddress inetsocketaddress;
                    if(socketaddress == null)
                    {
		        //地址为空,则获取本地地址
                        if(family == StandardProtocolFamily.INET)
                            inetsocketaddress = new InetSocketAddress(InetAddress.getByName("0.0.0.0"), 0);
                        else
                            inetsocketaddress = new InetSocketAddress(0);
                    } else
                    {
                        inetsocketaddress = Net.checkAddress(socketaddress);
                        if(family == StandardProtocolFamily.INET)
                        {
                            InetAddress inetaddress = inetsocketaddress.getAddress();
                            if(!(inetaddress instanceof Inet4Address))
                                throw new UnsupportedAddressTypeException();
                        }
                    }
                    SecurityManager securitymanager = System.getSecurityManager();
                    if(securitymanager != null)
		        //检查地址端口监听权限
                        securitymanager.checkListen(inetsocketaddress.getPort());
		    //委托net完成实际的绑定工作
                    Net.bind(family, fd, inetsocketaddress.getAddress(), inetsocketaddress.getPort());
		    //初始化本地地址
                    localAddress = Net.localAddress(fd);
                }
            }
        }
        return this;
    }

2.
do                              
    //读取报文,写到buffer      
    i = receive(fd, bytebuffer);
while(i == -3 && isOpen()); 
 
这个之所以是循环为了,当接收报文操作因某种原因中断,但报文没有读取完,当中断位消除时,继续读取报文。
private int receive(FileDescriptor filedescriptor, ByteBuffer bytebuffer)
    throws IOException
{
    int k;
    int l;
    ByteBuffer bytebuffer1;
    //获取buffer当前可用空间remaining
    int i = bytebuffer.position();
    int j = bytebuffer.limit();
    if(!$assertionsDisabled && i > j)
        throw new AssertionError();
    k = i > j ? 0 : j - i;//remaining
    //如果buffer为direct类型,则直接接收报文
    if((bytebuffer instanceof DirectBuffer) && k > 0)
        return receiveIntoNativeBuffer(filedescriptor, bytebuffer, k, i);
    l = Math.max(k, 1);
    //从当前线程缓冲区获取临时DirectByteBuffer
    bytebuffer1 = Util.getTemporaryDirectBuffer(l);
    int j1;
    //接收报文
    int i1 = receiveIntoNativeBuffer(filedescriptor, bytebuffer1, l, 0);
    //切换读写模式
    bytebuffer1.flip();
    if(i1 > 0 && k > 0)
        //读取临时DirectByteBuffer,写到buffer中
        bytebuffer.put(bytebuffer1);
    j1 = i1;
    //释放临时DirectByteBuffer,即,添加DirectByteBuffer到当前线程缓存区,以便重用
    Util.releaseTemporaryDirectBuffer(bytebuffer1);
    return j1;
    Exception exception;
    exception;
    Util.releaseTemporaryDirectBuffer(bytebuffer1);
    throw exception;
}

private int receiveIntoNativeBuffer(FileDescriptor filedescriptor, ByteBuffer bytebuffer, int i, int j)
    throws IOException
{
    //读取报文,写到buffer中
    int k = receive0(filedescriptor, ((DirectBuffer)bytebuffer).address() + (long)j, i, isConnected());
    if(k > 0)
        //buffer的position向前移动k个位置
        bytebuffer.position(j + k);
    return k;
}

private native int receive0(FileDescriptor filedescriptor, long l, int i, boolean flag)
        throws IOException;

receive(接收报文)方法,首先同步读锁,确保通道打开,如果本地地址为null,则绑定local地址,并初始化报文通道的localAddress;获取buffer当前可用空间remaining,如果buffer为direct类型,则直接接收报文,否则,从当前线程缓冲区获取临时DirectByteBuffer,接收报文,写到临时缓冲区临时DirectByteBuffer,读取临时DirectByteBuffer,写到buffer中,释放临时DirectByteBuffer,即添加DirectByteBuffer到当前线程缓存区,以便重用。
发送报文和接受报文方法看完,我们来看一下需要通道建立连接,才能进行使用的读写操作方法:先来看写操作,读取buffer,写到输出流
  public int write(ByteBuffer bytebuffer)
        throws IOException
    {
        if(bytebuffer == null)
            throw new NullPointerException();
        Object obj = writeLock;
        JVM INSTR monitorenter ;
        int i;
        synchronized(stateLock)
        {
            ensureOpen();
            if(!isConnected())
                throw new NotYetConnectedException();
        }
        i = 0;
        int j;
        begin();
        if(isOpen())
            break MISSING_BLOCK_LABEL_123;
        j = 0;
        writerThread = 0L;
        end(i > 0 || i == -2);
        if(!$assertionsDisabled && !IOStatus.check(i))
            throw new AssertionError();
        return j;
        writerThread = NativeThread.current();
        do
	    //关键在这,委托给IOUtil
            i = IOUtil.write(fd, bytebuffer, -1L, nd, writeLock);
        while(i == -3 && isOpen());
       ...
    }

再来看读写buffer组,写到输出流
public long write(ByteBuffer abytebuffer[], int i, int j)
        throws IOException
    {
        ...
        writerThread = NativeThread.current();
        do
	    //关键在这,委托给IOUtil
            l = IOUtil.write(fd, abytebuffer, i, j, nd);
        while(l == -3L && isOpen());
	...
   }

再来读操作,从输入流读取报文,写到buffer中
 public int read(ByteBuffer bytebuffer)
        throws IOException
    {
        if(bytebuffer == null)
            throw new NullPointerException();
        Object obj = readLock;
        JVM INSTR monitorenter ;
        int i;
        synchronized(stateLock)
        {
            ensureOpen();
            if(!isConnected())
                throw new NotYetConnectedException();
        }
        i = 0;
        int j;
        begin();
        if(isOpen())
            break MISSING_BLOCK_LABEL_123;
        j = 0;
        readerThread = 0L;
        end(i > 0 || i == -2);
        if(!$assertionsDisabled && !IOStatus.check(i))
            throw new AssertionError();
        return j;
        readerThread = NativeThread.current();
        do
	    //关键点在这,委托给IOUtil
            i = IOUtil.read(fd, bytebuffer, -1L, nd, readLock);
        while(i == -3 && isOpen());
	...
    }

从输入流读取报文,写到buffer数组中,
 public long read(ByteBuffer abytebuffer[], int i, int j)
        throws IOException
    {
       ...
        readerThread = NativeThread.current();
        do
	    //关键点在这,委托给IOUtil
            l = IOUtil.read(fd, abytebuffer, i, j, nd);
        while(l == -3L && isOpen());
       ...
    }

read和write形式的方法关键的读写操作都是委托给IOUtil来完成,这个与SocketChannelImpl中读写操作基本思路相同,这里就不再介绍。
总结:
      send(发送报文)方法,首先同步写锁,确保通道打开,然后检查地址,如果系统安全管理器不为null,则更具地址类型检查相应的权限,如果地址为多播地址,则检查多播权限,否则检查连接到socketaddress的权限;如果发送的buffer为direct类型,则直接发送,否则从当前线程缓冲区获取一个临时DirectByteBuffer,并将buffer中的数据写到临时DirectByteBuffer中,然后发送,发送后,释放临时DirectByteBuffer,即添加到当前线程缓存区以便重用。
      receive(接收报文)方法,首先同步读锁,确保通道打开,如果本地地址为null,则绑定local地址,并初始化报文通道的localAddress;获取buffer当前可用空间remaining,如果buffer为direct类型,则直接接收报文,否则,从当前线程缓冲区获取临时DirectByteBuffer,接收报文,写到临时缓冲区临时DirectByteBuffer,读取临时DirectByteBuffer,写到buffer中,释放临时DirectByteBuffer,即添加DirectByteBuffer到当前线程缓存区,以便重用。
     send(发送报文)和receive(接收报文)方法不需要通道已经处于连接状态,而read和write需要通道建立连接状态,这种方式与SocketChannel的读写操作相同,这样与SocketChannel无异,如果需要不如使用SocketChannel。如果使用DatagramChannel,建议使用send和recieve方法进行报文的发送和接收。

DatagramChannelImpl 解析三(多播):http://donald-draper.iteye.com/blog/2373507
相关标签: nio