欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Mina 报文监听器NioDatagramAcceptor一(初始化,Io处理器)

程序员文章站 2022-03-11 09:30:16
...
Mina Io监听器接口定义及抽象实现:http://donald-draper.iteye.com/blog/2378315
Mina Io处理器抽象实现:http://donald-draper.iteye.com/blog/2377663
Mina 报文通信简单示例 :http://donald-draper.iteye.com/blog/2379002
上一篇文章我们通过一个实例,简单看报文通信,通过下面一句:
IoAcceptor acceptor = new NioDatagramAcceptor();

创建一个报文监听器,今天我们来看一下报文监听器NioDatagramAcceptor。
**
 * {@link IoAcceptor} for datagram transport (UDP/IP).
 *
 * @author [url=http://mina.apache.org]Apache MINA Project[/url]
 * @org.apache.xbean.XBean
 */
public final class NioDatagramAcceptor extends AbstractIoAcceptor implements DatagramAcceptor, IoProcessor<NioSession> {
从报文监听器继承树来看,报文监听器直接实现了Io处理器的功能,在往下看之前,先来看一下报文监听器接口DatagramAcceptor
的定义;
//DatagramAcceptor
/**
 * {@link IoAcceptor} for datagram transport (UDP/IP).
 *
 * @author [url=http://mina.apache.org]Apache MINA Project[/url]
 */
public interface DatagramAcceptor extends IoAcceptor {
    /**
     * @return the local InetSocketAddress which is bound currently.  If more than one
     * address are bound, only one of them will be returned, but it's not
     * necessarily the firstly bound address.
     * This method overrides the {@link IoAcceptor#getLocalAddress()} method.
     返回本地当前绑定的报文地址。如果多于一个地址被绑定,其中一个将会被返回,不一定是第一个
     绑定的地址
     */
    @Override
    InetSocketAddress getLocalAddress();

    /**
     * @return a {@link Set} of the local InetSocketAddress which are bound currently.
     * This method overrides the {@link IoAcceptor#getDefaultLocalAddress()} method.
     获取默认绑定的本地socket地址
     */
    @Override
    InetSocketAddress getDefaultLocalAddress();

    /**
     * Sets the default local InetSocketAddress to bind when no argument is specified in
     * {@link #bind()} method. Please note that the default will not be used
     * if any local InetSocketAddress is specified.
     * This method overrides the {@link IoAcceptor#setDefaultLocalAddress(java.net.SocketAddress)} method.
     * 设置默认本地socket地址,如果本地地址初始化,则默认的socket地址不会被使用
     * @param localAddress The local address
     */
    void setDefaultLocalAddress(InetSocketAddress localAddress);

    /**
     * @return the {@link IoSessionRecycler} for this service.
     service会话管理器
     */
    IoSessionRecycler getSessionRecycler();

    /**
     * Sets the {@link IoSessionRecycler} for this service.
     *
     * @param sessionRecycler <tt>null</tt> to use the default recycler
     */
    void setSessionRecycler(IoSessionRecycler sessionRecycler);

    /**
     * @return the default Datagram configuration of the new {@link IoSession}s
     * created by this service.
     获取报文会话配置
     */
    @Override
    DatagramSessionConfig getSessionConfig();
}

回到报文监听器NioDatagramAcceptor
/**
 * {@link IoAcceptor} for datagram transport (UDP/IP).
 *
 * @author [url=http://mina.apache.org]Apache MINA Project[/url]
 * @org.apache.xbean.XBean
 */
public final class NioDatagramAcceptor extends AbstractIoAcceptor implements DatagramAcceptor, IoProcessor<NioSession> {

    /**
     * A session recycler that is used to retrieve an existing session, unless it's too old.
     默认过期会话管理器
     **/
    private static final IoSessionRecycler DEFAULT_RECYCLER = new ExpiringSessionRecycler();
    /**
     * A timeout used for the select, as we need to get out to deal with idle
     * sessions 选择超时时间
     */
    private static final long SELECT_TIMEOUT = 1000L;
    /** A lock used to protect the selector to be waked up before it's created */
    private final Semaphore lock = new Semaphore(1);
    /** A queue used to store the list of pending Binds 地址绑定请求*/
    private final Queue<AcceptorOperationFuture> registerQueue = new ConcurrentLinkedQueue<>();
    //地址解绑请求队列
    private final Queue<AcceptorOperationFuture> cancelQueue = new ConcurrentLinkedQueue<>();
    //刷新会话队列,IO处理器刷新操作会用到,暂存刷新操作的会话
    private final Queue<NioSession> flushingSessions = new ConcurrentLinkedQueue<>();
    // socket地址与报文通道映射Map,绑定操作使socket地址与报文通道关联起来
    private final Map<SocketAddress, DatagramChannel> boundHandles = Collections
            .synchronizedMap(new HashMap<SocketAddress, DatagramChannel>());
    //会话管理器sessionRecycler,监控连接Service的会话,如果会话过期,关闭过期的会话
    private IoSessionRecycler sessionRecycler = DEFAULT_RECYCLER;
    private final ServiceOperationFuture disposalFuture = new ServiceOperationFuture();
    private volatile boolean selectable;
    /** The thread responsible of accepting incoming requests */
    private Acceptor acceptor;//监听器线程
    private long lastIdleCheckTime;//上次空闲检查时间
    /** The Selector used by this acceptor 选择器*/
    private volatile Selector selector;
}

从上面来看报文监听器NioDatagramAcceptor,内部有一个注册队列registerQueue,用于存放地址绑定的请求,一个取消队列,用于存放地址解绑请求,一个Map-boundHandles,用于存放socket地址与报文通道映射映射关系,会话管理器sessionRecycler,监控连接Service的会话,如果会话过期,关闭过期的会话,一个通道选择器selector处理报文通道的读写操作事件,一个监听器线程acceptor,用于处理地址绑定和解绑,报文通道读写事件,发送会话消息及销毁监听器工作。
再来看构造:
/**
 * Creates a new instance.
 */
public NioDatagramAcceptor() {
    this(new DefaultDatagramSessionConfig(), null);
}
/**
 * Creates a new instance.
 * 与上面不同的是,多一个IO事件执行器参数
 * @param executor The executor to use
 */
public NioDatagramAcceptor(Executor executor) {
    this(new DefaultDatagramSessionConfig(), executor);
}
/**
 * Creates a new instance.
 与上面不同的是,多个会话配置参数
 */
private NioDatagramAcceptor(IoSessionConfig sessionConfig, Executor executor) {
    super(sessionConfig, executor);

    try {
        init();//初始化报文监听器
        selectable = true;
    } catch (RuntimeException e) {
        throw e;
    } catch (Exception e) {
        throw new RuntimeIoException("Failed to initialize.", e);
    } finally {
        if (!selectable) {
            try {
                destroy();
            } catch (Exception e) {
                ExceptionMonitor.getInstance().exceptionCaught(e);
            }
        }
    }
}

来看初始化报文监听器
init();//初始化报文监听器
 protected void init() throws Exception {
        //打开一个选择器
        this.selector = Selector.open();
}

从上面可以看出,报文监听器构造主要是初始化会话配置,IO事件执行器和打开选择器。
由于报文监听器即实现了Io监听器,有实现了Io处理器我们来看IO处理器的相关实现:

/**
 * {@inheritDoc}
 添加会话
 */
@Override
public void add(NioSession session) {
    // Nothing to do for UDP
    //由于报文通信是无连接的,添加会话操作实际为空
}
再来看发送会话写请求:
/**
   * {@inheritDoc}
   */
  @Override
  public void write(NioSession session, WriteRequest writeRequest) {
      // We will try to write the message directly
      long currentTime = System.currentTimeMillis();//获取系统当前时间
      //获取会话写请求队列
      final WriteRequestQueue writeRequestQueue = session.getWriteRequestQueue();
      //计算会话最大发送字节数
      final int maxWrittenBytes = session.getConfig().getMaxReadBufferSize()
              + (session.getConfig().getMaxReadBufferSize() >>> 1);

      int writtenBytes = 0;

      // Deal with the special case of a Message marker (no bytes in the request)
      // We just have to return after having calle dthe messageSent event
      //获取会话写请求buffer
      IoBuffer buf = (IoBuffer) writeRequest.getMessage();

      if (buf.remaining() == 0) {
          // Clear and fire event
	  //如果buffer中没有数据,则置空会话当前写请求,触发会话发送事件
          session.setCurrentWriteRequest(null);
          buf.reset();
          session.getFilterChain().fireMessageSent(writeRequest);
          return;
      }

      // Now, write the data
      try {
          for (;;) {
              if (writeRequest == null) {
	         //如果写请求为空,则从请求队列poll一个写请求
                  writeRequest = writeRequestQueue.poll(session);
                  if (writeRequest == null) {
		      //取消关注写事件
                      setInterestedInWrite(session, false);
                      break;
                  }
                  //设置会话当前写请求
                  session.setCurrentWriteRequest(writeRequest);
              }
             //获取写请求buffer
              buf = (IoBuf fer) writeRequest.getMessage();

              if (buf.remaining() == 0) {
                  // Clear and fire event
		  //如果buffer中没有数据,则置空会话当前写请求,触发会话发送事件
                  session.setCurrentWriteRequest(null);
                  buf.reset();
                  session.getFilterChain().fireMessageSent(writeRequest);
                  continue;
              }
              //获取写请求目的socket地址
              SocketAddress destination = writeRequest.getDestination();

              if (destination == null) {
	          //写请求目的地址为null,则获取会话远端socket地址
                  destination = session.getRemoteAddress();
              }
             //发送buffer数据到socket地址
              int localWrittenBytes = send(session, buf, destination);

              if ((localWrittenBytes == 0) || (writtenBytes >= maxWrittenBytes)) {
                  // Kernel buffer is full or wrote too much
		  //如果buffer数据太多或没有写成功,添加写请求到会话请求队列,关注写事件
                  setInterestedInWrite(session, true);
                  session.getWriteRequestQueue().offer(session, writeRequest);
                  scheduleFlush(session);
              } else {
	          //则取消关注写事件,置空会话当前写请求,触发会话发送事件
                  setInterestedInWrite(session, false);
                  // Clear and fire event
                  session.setCurrentWriteRequest(null);
                  writtenBytes += localWrittenBytes;
                  buf.reset();
                  session.getFilterChain().fireMessageSent(writeRequest);
                  break;
              }
          }
      } catch (Exception e) {
          session.getFilterChain().fireExceptionCaught(e);
      } finally {
          //更新会话写字节计数器
          session.increaseWrittenBytes(writtenBytes, currentTime);
      }
  }

发送会话请求数据有一下几点要关注:
1.
//设置会话写事件
 setInterestedInWrite(session, false);
   protected void setInterestedInWrite(NioSession session, boolean isInterested) throws Exception {
        //获取会话选择key
        SelectionKey key = session.getSelectionKey();
        if (key == null) {
            return;
        }
        int newInterestOps = key.interestOps();

        if (isInterested) {
	   //设置关注写事件
            newInterestOps |= SelectionKey.OP_WRITE;
        } else {
	   //取消关注写事件
            newInterestOps &= ~SelectionKey.OP_WRITE;
        }
        key.interestOps(newInterestOps);
    }

2.
//发送buffer数据到socket地址
   int localWrittenBytes = send(session, buf, destination);

//委托会话关联的报文通道
 protected int send(NioSession session, IoBuffer buffer, SocketAddress remoteAddress) throws Exception {
        return ((DatagramChannel) session.getChannel()).send(buffer.buf(), remoteAddress);
    }

3.
//调度刷新会话
scheduleFlush(session);

private boolean scheduleFlush(NioSession session) {
        // Set the schedule for flush flag if the session
        // has not already be added to the flushingSessions
        // queue
	//更新会话调度标志为正在调度,添加会话到刷新队列
        if (session.setScheduledForFlush(true)) {
            flushingSessions.add(session);
            return true;
        } else {
            return false;
        }
}

从上面来看,报文监听器写操作,首先获取会话写请求队列,计算会话最大发送字节数,获取会话写请求buffer;如果写请求为空,则从请求队列poll一个写请求,然后获取写请求buffer及写请求目的socket地址,委托会话关联的报文通道发送数据;如果buffer数据太多或没有写成功,添加写请求到会话请求队列,关注写事件,重新调度刷新,否则取消关注写事件,置空会话当前写请求,触发会话发送事件。
再来看刷新操作:
 /**
  * {@inheritDoc}
  */
 @Override
 public void flush(NioSession session) {
     //添加会话到刷新队列
     if (scheduleFlush(session)) {
         //唤醒选择器
         wakeup();
     }
 }

//唤醒选择器
protected void wakeup() {
      selector.wakeup();
  }

再来看其他操作
/**
  * {@inheritDoc}
  */
 @Override
 public void updateTrafficControl(NioSession session) {
     //不支持会话传输控制
     throw new UnsupportedOperationException();
 }
/**
 * {@inheritDoc}
 移除会话
 */
@Override
public void remove(NioSession session) { 
    //从会话回收器移除会话,通知service监听器,会话移除,触发fireSessionDestroyed事件
    getSessionRecycler().remove(session);
    getListeners().fireSessionDestroyed(session);
}

看完报文监听器IO处理器的相关功能来看一下地址绑定
/**
 * {@inheritDoc}
 */
@Override
protected final Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception {
    // Create a bind request as a Future operation. When the selector
    // have handled the registration, it will signal this future.
    AcceptorOperationFuture request = new AcceptorOperationFuture(localAddresses);
    // adds the Registration request to the queue for the Workers
    // to handle
    //添加地址绑定请求到注册队列
    registerQueue.add(request);

    // creates the Acceptor instance and has the local
    // executor kick it off.
    //启动监听器线程
    startupAcceptor();

    // As we just started the acceptor, we have to unblock the select()
    // in order to process the bind request we just have added to the
    // registerQueue.
    try {
        lock.acquire();

        // Wait a bit to give a chance to the Acceptor thread to do the select()
        Thread.sleep(10);
	//唤醒选择操作
        wakeup();
    } finally {
        lock.release();
    }

    // Now, we wait until this request is completed.
    //等待地址绑定完成
    request.awaitUninterruptibly();

    if (request.getException() != null) {
        throw request.getException();
    }

    // Update the local addresses.
    // setLocalAddresses() shouldn't be called from the worker thread
    // because of deadlock.
    //handle绑定的地址集
    Set<SocketAddress> newLocalAddresses = new HashSet<>();
    for (DatagramChannel handle : boundHandles.values()) {
        newLocalAddresses.add(localAddress(handle));
    }
    return newLocalAddresses;
}

从上面来看绑定地址,首先添加地址绑定请求到注册队列registerQueue,启动监听器线程acceptor,唤醒选择操作,然后等待地址绑定完成,最后返回报文通道绑定的socket地址集。
上面有几点要关注:
1.
//启动监听器线程
startupAcceptor();

2.
//获取报文通道绑定的socket地址
localAddress(handle)

先来看第二点:
2.
//获取报文通道绑定的socket地址
localAddress(handle)

protected SocketAddress localAddress(DatagramChannel handle) throws Exception {
      //获取报文通道关联socket绑定的本地socket地址
      InetSocketAddress inetSocketAddress = (InetSocketAddress) handle.socket().getLocalSocketAddress();
      InetAddress inetAddress = inetSocketAddress.getAddress();

      if ((inetAddress instanceof Inet6Address) && (((Inet6Address) inetAddress).isIPv4CompatibleAddress())) {
          // Ugly hack to workaround a problem on linux : the ANY address is always converted to IPV6
          // even if the original address was an IPV4 address. We do store the two IPV4 and IPV6
          // ANY address in the map.
          byte[] ipV6Address = ((Inet6Address) inetAddress).getAddress();
          byte[] ipV4Address = new byte[4];

          System.arraycopy(ipV6Address, 12, ipV4Address, 0, 4);

          InetAddress inet4Adress = Inet4Address.getByAddress(ipV4Address);
          return new InetSocketAddress(inet4Adress, inetSocketAddress.getPort());
      } else {
          return inetSocketAddress;
      }
}

再来看第一点:
1.
//启动监听器线程
startupAcceptor();


/**
 * Starts the inner Acceptor thread.
 */
private void startupAcceptor() throws InterruptedException {
    if (!selectable) {
        //如果选择器初始化失败,则清空注册队列,取消队列及刷新会话队列
        registerQueue.clear();
        cancelQueue.clear();
        flushingSessions.clear();
    }
    lock.acquire();
    if (acceptor == null) {
        //创建Acceptor线程实例,并执行
        acceptor = new Acceptor();
        executeWorker(acceptor);
    } else {
        lock.release();
    }
}

下面来看一下Acceptor的定义:
 /**
  * This private class is used to accept incoming connection from
  * clients. It's an infinite loop, which can be stopped when all
  * the registered handles have been removed (unbound).
  接收客户端的连接。主操作是一个无限循环,当所有绑定的地址的报文通道解绑时,
  循环退出
  */
 private class Acceptor implements Runnable {
     @Override
     public void run() {
         int nHandles = 0;
         lastIdleCheckTime = System.currentTimeMillis();
         // Release the lock
         lock.release();
         while (selectable) {
             try {
	         //超时选择
                 int selected = select(SELECT_TIMEOUT);
		 //处理地址绑定请求
                 nHandles += registerHandles();
                 if (nHandles == 0) {
                     try {
                         lock.acquire();
                         if (registerQueue.isEmpty() && cancelQueue.isEmpty()) {
                             acceptor = null;
                             break;
                         }
                     } finally {
                         lock.release();
                     }
                 }
                 if (selected > 0) {
		     //处理读写操作时间就绪的会话
                     processReadySessions(selectedHandles());
                 }
                 long currentTime = System.currentTimeMillis();
		 //发送刷新队列中的写请求
                 flushSessions(currentTime);
		 //处理报文通道地址解绑请求
                 nHandles -= unregisterHandles();
		 //通知会话空闲
                 notifyIdleSessions(currentTime);
             } catch (ClosedSelectorException cse) {
                 // If the selector has been closed, we can exit the loop
                 ExceptionMonitor.getInstance().exceptionCaught(cse);
                 break;
             } catch (Exception e) {
                 ExceptionMonitor.getInstance().exceptionCaught(e);
                 try {
                     Thread.sleep(1000);
                 } catch (InterruptedException e1) {
                 }
             }
         }
          //如何Io处理器正在关闭,则销毁报文监听器
         if (selectable && isDisposing()) {
             selectable = false;
             try {
                 destroy();
             } catch (Exception e) {
                 ExceptionMonitor.getInstance().exceptionCaught(e);
             } finally {
                 disposalFuture.setValue(true);
             }
         }
     }
}

由于篇幅问题,监听器线程acceptor,我们放到下一篇再讲
总结:
报文监听器NioDatagramAcceptor,内部有一个注册队列registerQueue,用于存放地址绑定的请求,一个取消队列,用于存放地址解绑请求,一个Map-boundHandles,用于存放socket地址与报文通道映射映射关系,会话管理器sessionRecycler,监控连接Service的会话,如果会话过期,关闭过期的会话,一个通道选择器selector处理报文通道的读写操作事件,一个监听器线程acceptor,用于处理地址绑定和解绑,报文通道读写事件,发送会话消息及销毁监听器工作。报文监听器构造主要是初始化会话配置,IO事件执行器和打开选择器。报文监听器写操作,首先获取会话写请求队列,计算会话最大发送字节数,获取会话写请求buffer;如果写请求为空,则从请求队列poll一个写请求,然后获取写请求buffer及写请求目的socket地址,委托会话关联的报文通道发送数据;如果buffer数据太多或没有写成功,添加写请求到会话请求队列,关注写事件,否则取消关注写事件,置空会话当前写请求,触发会话发送事件。绑定地址,首先添加地址绑定请求到注册队列registerQueue,启动监听器线程acceptor,唤醒选择操作,然后等待地址绑定完成,最后返回报文通道绑定的socket地址集。

Mina 报文监听器NioDatagramAcceptor二(发送会话消息据等):http://donald-draper.iteye.com/blog/2379228
附:
会话回收器IoSessionRecycler:
/**
 * A connectionless transport can recycle existing sessions by assigning an
 * {@link IoSessionRecycler} to an {@link IoService}.
 *
 * @author [url=http://mina.apache.org]Apache MINA Project[/url]
 */
public interface IoSessionRecycler {
    /**
     * A dummy recycler that doesn't recycle any sessions.  Using this recycler will
     * make all session lifecycle events to be fired for every I/O for all connectionless
     * sessions.
     */
    IoSessionRecycler NOOP = new IoSessionRecycler() {
        /**
         * {@inheritDoc}
         */
        @Override
        public void put(IoSession session) {
            // Do nothing
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public IoSession recycle(SocketAddress remoteAddress) {
            return null;
        }
        /**
         * {@inheritDoc}
         */
        @Override
        public void remove(IoSession session) {
            // Do nothing
        }
    };
    /**
     * Called when the underlying transport creates or writes a new {@link IoSession}.
     *
     * @param session the new {@link IoSession}.
     */
    void put(IoSession session);
    /**
     * Attempts to retrieve a recycled {@link IoSession}.
     *
     * @param remoteAddress the remote socket address of the {@link IoSession} the transport wants to recycle.
     * @return a recycled {@link IoSession}, or null if one cannot be found.
     */
    IoSession recycle(SocketAddress remoteAddress);
    /**
     * Called when an {@link IoSession} is explicitly closed.
     *
     * @param session the new {@link IoSession}.
     */
    void remove(IoSession session);
}
相关标签: mina