欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

DatagramChannelImpl 解析三(多播)

程序员文章站 2022-07-13 16:59:54
...
DatagramChannelImpl 解析一(初始化):http://donald-draper.iteye.com/blog/2373245
DatagramChannelImpl 解析二(报文发送与接收):http://donald-draper.iteye.com/blog/2373281
引言:
上一篇看了报文的发送和接收,先来回顾一下,
   send(发送报文)方法,首先同步写锁,确保通道打开,然后检查地址,如果系统安全管理器不为null,则更具地址类型检查相应的权限,如果地址为多播地址,则检查多播权限,否则检查连接到socketaddress的权限;如果发送的buffer为direct类型,则直接发送,否则从当前线程缓冲区获取一个临时DirectByteBuffer,并将buffer中的数据写到临时DirectByteBuffer中,然后发送,发送后,释放临时DirectByteBuffer,即添加到当前线程缓存区以便重用。
      receive(接收报文)方法,首先同步读锁,确保通道打开,如果本地地址为null,则绑定local地址,并初始化报文通道的localAddress;获取buffer当前可用空间remaining,如果buffer为direct类型,则直接接收报文,否则,从当前线程缓冲区获取临时DirectByteBuffer,接收报文,写到临时缓冲区临时DirectByteBuffer,读取临时DirectByteBuffer,写到buffer中,释放临时DirectByteBuffer,即添加DirectByteBuffer到当前线程缓存区,以便重用。
     send(发送报文)和receive(接收报文)方法不需要通道已经处于连接状态,而read和write需要通道建立连接状态,这种方式与SocketChannel的读写操作相同,这样与SocketChannel无异,如果需要不如使用SocketChannel。如果使用DatagramChannel,建议使用send和recieve方法进行报文的发送和接收。
今天我们来看一下多播相关的方法为drop,block,unblock,join。
先看join方法
//添加到多播组inetaddress
 public MembershipKey join(InetAddress inetaddress, NetworkInterface networkinterface)
        throws IOException
    {
        return innerJoin(inetaddress, networkinterface, null);
    }
//添加到多播组,只接受源地址为inetaddress1的报文
    public MembershipKey join(InetAddress inetaddress, NetworkInterface networkinterface, InetAddress inetaddress1)
        throws IOException
    {
        if(inetaddress1 == null)
            throw new NullPointerException("source address is null");
        else
            return innerJoin(inetaddress, networkinterface, inetaddress1);
    }

从上面可以看出加入多播组实际上的操作是由innerJoin来完成
 private MembershipKey innerJoin(InetAddress inetaddress, NetworkInterface networkinterface, InetAddress inetaddress1)
     throws IOException
 {
     //非多播地址抛出异常
     if(!inetaddress.isMulticastAddress())
         throw new IllegalArgumentException("Group not a multicast address");
     //如果地址为ip6,但加入的多播组地址为ip4,则抛出参数异常
     if(inetaddress instanceof Inet4Address)
     {
         if(family == StandardProtocolFamily.INET6 && !Net.canIPv6SocketJoinIPv4Group())
             throw new IllegalArgumentException("IPv6 socket cannot join IPv4 multicast group");
     } else
     if(inetaddress instanceof Inet6Address)
     {
         //如果多播地址为ip6,协议非INET6,抛出异常
         if(family != StandardProtocolFamily.INET6)
             throw new IllegalArgumentException("Only IPv6 sockets can join IPv6 multicast group");
     } else
     {
         throw new IllegalArgumentException("Address type not supported");
     }
     //如果多播组源地址不为空,则校验源地址
     if(inetaddress1 != null)
     {
         if(inetaddress1.isAnyLocalAddress())//源地址含通配符,address == 0;
             throw new IllegalArgumentException("Source address is a wildcard address");
         if(inetaddress1.isMulticastAddress())//源地址为多播地址
             throw new IllegalArgumentException("Source address is multicast address");
         if(inetaddress1.getClass() != inetaddress.getClass())//源地址与多播地址类型不同
             throw new IllegalArgumentException("Source address is different type to group");
     }
     SecurityManager securitymanager = System.getSecurityManager();
     if(securitymanager != null)
         //检查多播地址权限,接受和连接权限
         securitymanager.checkMulticast(inetaddress);
     Object obj = stateLock;
     JVM INSTR monitorenter ;
     Object obj1;
     if(!isOpen())//确保通道打开
         throw new ClosedChannelException();
     if(registry == null)
     {
         //多播关系注册器为null,则创建
         registry = new MembershipRegistry();
         break MISSING_BLOCK_LABEL_229;
     }
     //检查多播成员关系注册器中是否存在多播地址为inetaddress,网络接口为networkinterface,
     //源地址为inetaddress1,多播成员关系key
     obj1 = registry.checkMembership(inetaddress, networkinterface, inetaddress1);
     if(obj1 != null)
         //有则直接返回
         return ((MembershipKey) (obj1));
     //否则根据网络协议族family,网络接口,源地址构造MembershipKeyImpl
     if(family == StandardProtocolFamily.INET6 && ((inetaddress instanceof Inet6Address) || Net.canJoin6WithIPv4Group()))
     {//Ip6
         int i = networkinterface.getIndex();
         if(i == -1)
             throw new IOException("Network interface cannot be identified");
         byte abyte0[] = Net.inet6AsByteArray(inetaddress);
         byte abyte1[] = inetaddress1 != null ? Net.inet6AsByteArray(inetaddress1) : null;
	 //加入多播组
         int l = Net.join6(fd, abyte0, i, abyte1);
         if(l == -2)
             throw new UnsupportedOperationException();
         obj1 = new MembershipKeyImpl.Type6(this, inetaddress, networkinterface, inetaddress1, abyte0, i, abyte1);
     } else
     {//Ip4
         Inet4Address inet4address = Net.anyInet4Address(networkinterface);
         if(inet4address == null)
             throw new IOException("Network interface not configured for IPv4");
         int j = Net.inet4AsInt(inetaddress);
         int k = Net.inet4AsInt(inet4address);
         int i1 = inetaddress1 != null ? Net.inet4AsInt(inetaddress1) : 0;
	 //加入多播组
         int j1 = Net.join4(fd, j, k, i1);
         if(j1 == -2)
             throw new UnsupportedOperationException();
         obj1 = new MembershipKeyImpl.Type4(this, inetaddress, networkinterface, inetaddress1, j, k, i1);
     }
     //添加多播成员关系key到注册器
     registry.add(((MembershipKeyImpl) (obj1)));
     obj1;
     obj;
     JVM INSTR monitorexit ;
     return;
     Exception exception;
     exception;
     throw exception;
 }

以上方法有两点要关注
1.
if(inetaddress1.isAnyLocalAddress())//源地址含通配符,address == 0;
     throw new IllegalArgumentException("Source address is a wildcard address");
 if(inetaddress1.isMulticastAddress())//源地址为多播地址
     throw new IllegalArgumentException("Source address is multicast address");

//Inet4Address
public final class Inet4Address extends InetAddress {
    final static int INADDRSZ = 4;
     /**是否为多播地址
     * Utility routine to check if the InetAddress is an
     * IP multicast address. IP multicast address is a Class D
     * address i.e first four bits of the address are 1110.
     * @return a <code>boolean</code> indicating if the InetAddress is
     * an IP multicast address
     * @since   JDK1.1
     */
    public boolean isMulticastAddress() {
        return ((address & 0xf0000000) == 0xe0000000);
    }

    /**
    是否为统配符地址
     * Utility routine to check if the InetAddress in a wildcard address.
     * @return a <code>boolean</code> indicating if the Inetaddress is
     *         a wildcard address.
     * @since 1.4
     */
    public boolean isAnyLocalAddress() {
        return address == 0;
    }

    /**
     * Utility routine to check if the InetAddress is a loopback address.
     *是否为环路地址
     * @return a <code>boolean</code> indicating if the InetAddress is
     * a loopback address; or false otherwise.
     * @since 1.4
     */
    private static final int loopback = 2130706433; /* 127.0.0.1 */
    public boolean isLoopbackAddress() {
        /* 127.x.x.x */
        byte[] byteAddr = getAddress();
        return byteAddr[0] == 127;
    }
    ...
}

//InetAddress
public class InetAddress implements java.io.Serializable {
    /**
     * Specify the address family: Internet Protocol, Version 4
     * @since 1.4
     */
    static final int IPv4 = 1;

    /**
     * Specify the address family: Internet Protocol, Version 6
     * @since 1.4
     */
    static final int IPv6 = 2;

    /* Specify address family preference */
    static transient boolean preferIPv6Address = false;

    /**
     * @serial
     */
    String hostName;

    /**
     * Holds a 32-bit IPv4 address.
     *
     * @serial
     */
    int address;

    /**
     * Specifies the address family type, for instance, '1' for IPv4
     * addresses, and '2' for IPv6 addresses.
     *
     * @serial
     */
    int family;
}

2.
//加入多播组
int j1 = Net.join4(fd, j, k, i1);

//Net
static int join4(FileDescriptor filedescriptor, int i, int j, int k)
        throws IOException
    {
        return joinOrDrop4(true, filedescriptor, i, j, k);
    }
 private static native int joinOrDrop4(boolean flag, FileDescriptor filedescriptor, int i, int j, int k)
        throws IOException;


//加入多播组
int l = Net.join6(fd, abyte0, i, abyte1);

//Net
static int join6(FileDescriptor filedescriptor, byte abyte0[], int i, byte abyte1[])
        throws IOException
    {
        return joinOrDrop6(true, filedescriptor, abyte0, i, abyte1);
    }
 private static native int joinOrDrop6(boolean flag, FileDescriptor filedescriptor, byte abyte0[], int i, byte abyte1[])
        throws IOException;

从上面可以看出,报文通道加入多播组,首先检查加入的多播组地址是否正确,然后校验源地址,检查多播成员关系注册器中是否存在多播地址为inetaddress,网络接口为networkinterface,源地址为inetaddress1的多播成员关系key,有则直接返回,否则根据网络协议族family,网络接口,源地址构造多播成员关系MembershipKeyImpl,添加到注册器MembershipRegistry。
再来看block方法

void block(MembershipKeyImpl membershipkeyimpl, InetAddress inetaddress)
        throws IOException
    {
        //如果断言,开启,则判断多播关系key通道是否为本通道
        if(!$assertionsDisabled && membershipkeyimpl.channel() != this)
            throw new AssertionError();
	//断言源地址是否为null
        if(!$assertionsDisabled && membershipkeyimpl.sourceAddress() != null)
            throw new AssertionError();
        synchronized(stateLock)
        {
	    //如果多播成员关系无效
            if(!membershipkeyimpl.isValid())
                throw new IllegalStateException("key is no longer valid");
            if(inetaddress.isAnyLocalAddress())//如果源地址为统配地址
                throw new IllegalArgumentException("Source address is a wildcard address");
            if(inetaddress.isMulticastAddress())//如果源地址为多播地址
                throw new IllegalArgumentException("Source address is multicast address");
            if(inetaddress.getClass() != membershipkeyimpl.group().getClass())//如果多播地址与源地址类型不同
                throw new IllegalArgumentException("Source address is different type to group");
            int i;
	    //如果为多播组为IP6
            if(membershipkeyimpl instanceof MembershipKeyImpl.Type6)
            {
                MembershipKeyImpl.Type6 type6 = (MembershipKeyImpl.Type6)membershipkeyimpl;
		//委托给net
                i = Net.block6(fd, type6.groupAddress(), type6.index(), Net.inet6AsByteArray(inetaddress));
            } else
            {
	        //如果为多播组为IP4
                MembershipKeyImpl.Type4 type4 = (MembershipKeyImpl.Type4)membershipkeyimpl;
		//委托给net
                i = Net.block4(fd, type4.groupAddress(), type4.interfaceAddress(), Net.inet4AsInt(inetaddress));
            }
            if(i == -2)
                throw new UnsupportedOperationException();
        }
    }

上面一个方法需要关注的为
1.
i = Net.block4(fd, type4.groupAddress(), type4.interfaceAddress(), Net.inet4AsInt(inetaddress));

//Net
static int block4(FileDescriptor filedescriptor, int i, int j, int k)
        throws IOException
    {
        return blockOrUnblock4(true, filedescriptor, i, j, k);
    }
 private static native int blockOrUnblock4(boolean flag, FileDescriptor filedescriptor, int i, int j, int k)
        throws IOException;

2.
i = Net.block6(fd, type6.groupAddress(), type6.index(), Net.inet6AsByteArray(inetaddress));

//Net
static int block6(FileDescriptor filedescriptor, byte abyte0[], int i, byte abyte1[])
        throws IOException
    {
        return blockOrUnblock6(true, filedescriptor, abyte0, i, abyte1);
    }
 static native int blockOrUnblock6(boolean flag, FileDescriptor filedescriptor, byte abyte0[], int i, byte abyte1[])
        throws IOException;

再来看unblock方法
 void unblock(MembershipKeyImpl membershipkeyimpl, InetAddress inetaddress)
    {
        //如果断言,开启,则判断多播关系key通道是否为本通道
        if(!$assertionsDisabled && membershipkeyimpl.channel() != this)
            throw new AssertionError();
	//断言源地址是否为null
        if(!$assertionsDisabled && membershipkeyimpl.sourceAddress() != null)
            throw new AssertionError();
        synchronized(stateLock)
        {
            if(!membershipkeyimpl.isValid())//如果多播成员关系无效
                throw new IllegalStateException("key is no longer valid");
            try
            {
                if(membershipkeyimpl instanceof MembershipKeyImpl.Type6)
                {
		    //如果为多播组为IP6
                    MembershipKeyImpl.Type6 type6 = (MembershipKeyImpl.Type6)membershipkeyimpl;
                    Net.unblock6(fd, type6.groupAddress(), type6.index(), Net.inet6AsByteArray(inetaddress));
                } else
                {
		   //如果为多播组为IP4
                    MembershipKeyImpl.Type4 type4 = (MembershipKeyImpl.Type4)membershipkeyimpl;
                    Net.unblock4(fd, type4.groupAddress(), type4.interfaceAddress(), Net.inet4AsInt(inetaddress));
                }
            }
            catch(IOException ioexception)
            {
                throw new AssertionError(ioexception);
            }
        }
    }

上面方法我们需要关注的是
1.
Net.unblock4(fd, type4.groupAddress(), type4.interfaceAddress(), Net.inet4AsInt(inetaddress));

//Net
static void unblock4(FileDescriptor filedescriptor, int i, int j, int k)
        throws IOException
    {
        blockOrUnblock4(false, filedescriptor, i, j, k);
    }

2.
Net.unblock6(fd, type6.groupAddress(), type6.index(), Net.inet6AsByteArray(inetaddress));

//Net
static void unblock6(FileDescriptor filedescriptor, byte abyte0[], int i, byte abyte1[])
        throws IOException
    {
        blockOrUnblock6(false, filedescriptor, abyte0, i, abyte1);
    }

从上面可以看出阻塞源地址报文与解除源地址报文阻塞,首先检查源地址,再将实际的阻塞与解除阻塞工作委托给Net完成。
再来看drop方法
//drop报文通道多播成员关系key
void drop(MembershipKeyImpl membershipkeyimpl)
    {
label0:
        {
	    //如果断言,开启,则判断多播关系key通道是否为本通道
            if(!$assertionsDisabled && membershipkeyimpl.channel() != this)
                throw new AssertionError();
            synchronized(stateLock)
            {
	        //如果多播成员关系key无效,调到label0
                if(membershipkeyimpl.isValid())
                    break label0;
            }
            return;
        }
        try
        {
            if(membershipkeyimpl instanceof MembershipKeyImpl.Type6)
            {
	        //如果为多播组为IP6
                MembershipKeyImpl.Type6 type6 = (MembershipKeyImpl.Type6)membershipkeyimpl;
                Net.drop6(fd, type6.groupAddress(), type6.index(), type6.source());
            } else
            {
	        //如果为多播组为IP6
                MembershipKeyImpl.Type4 type4 = (MembershipKeyImpl.Type4)membershipkeyimpl;
                Net.drop4(fd, type4.groupAddress(), type4.interfaceAddress(), type4.source());
            }
        }
        catch(IOException ioexception)
        {
            throw new AssertionError(ioexception);
        }
	//使多播成员关系key无效
        membershipkeyimpl.invalidate();
	//从报文通道注册器移除多播成员关系key
        registry.remove(membershipkeyimpl);
        obj;
        JVM INSTR monitorexit ;
          goto _L1
        exception;
        throw exception;
_L1:
    }

drop方法需要关注的为:
1.
Net.drop4(fd, type4.groupAddress(), type4.interfaceAddress(), type4.source());

//Net
static void drop4(FileDescriptor filedescriptor, int i, int j, int k)
        throws IOException
    {
        joinOrDrop4(false, filedescriptor, i, j, k);
    }

    private static native int joinOrDrop4(boolean flag, FileDescriptor filedescriptor, int i, int j, int k)
        throws IOException;

2.
Net.drop6(fd, type6.groupAddress(), type6.index(), type6.source());

//Net
  
 static void drop6(FileDescriptor filedescriptor, byte abyte0[], int i, byte abyte1[])
        throws IOException
    {
        joinOrDrop6(false, filedescriptor, abyte0, i, abyte1);
    }

    private static native int joinOrDrop6(boolean flag, FileDescriptor filedescriptor, byte abyte0[], int i, byte abyte1[])
        throws IOException;

从上面可以看出drop方法,首先判断多播成员关系key是否有效,如果有效,判断多播组为ip4还是ip6,然后委托给Net完成实际的drop工作。

总结:

       join(报文通道加入多播组)方法,首先检查加入的多播组地址是否正确,然后校验源地址,检查多播成员关系注册器中是否存在多播地址为inetaddress,网络接口为networkinterface,源地址为inetaddress1的多播成员关系key,有则直接返回,否则根据网络协议族family,网络接口,源地址构造多播成员关系MembershipKeyImpl,添加到注册器MembershipRegistry。
    阻塞源地址报文与解除源地址报文阻塞,首先检查源地址,再将实际的阻塞与解除阻塞工作委托给Net完成。
    drop方法,首先判断多播成员关系key是否有效,如果有效,判断多播组为ip4还是ip6,然后委托给Net完成实际的drop工作。

DatagramChannelImpl 解析四(地址绑定,关闭通道等):http://donald-draper.iteye.com/blog/2373519
相关标签: nio