欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Mina socket监听器(NioSocketAcceptor)

程序员文章站 2022-03-11 09:29:52
...
Mina IoService接口定义及抽象实现:http://donald-draper.iteye.com/blog/2378271
Mina Io监听器接口定义及抽象实现:http://donald-draper.iteye.com/blog/2378315
Mina 抽象polling监听器:http://donald-draper.iteye.com/blog/2378649
引言:
   上一篇文章看了抽象polling监听器,先来回顾一下:
AbstractPollingIoAcceptor主要变量为Io处理器processor,地址绑定请求队列registerQueue,地址解绑请求队列cancelQueue,监听器绑定的socket地址,与ServerSocketChannel映射关系boundHandles-Map,监听工作线程Acceptor引用acceptorRef。构造AbstractPollingIoAcceptor,主要是初始化会话配置,Io处理器类型,IO异步事件执行器为空的话默认为CachedThreadPool,然后初始化选择器。地址绑定过程为,创建绑定操作结果,注册绑定请求到注册地址绑定请求队列,创建监听器Acceptor实例并执行。Acceptor主要功能为,地址绑定,监听连接请求,解绑地址,实际工作逻辑为:如果监听器AbstractPollingIoAcceptor已经初始化,首先根据地址绑定队列中的绑定请求,打开一个ServerSocketChannle,注册接收事件OP_ACCEPT到选择器,并将绑定地址与ServerSocketChannle映射管理添加到地址绑定映射集合;执行选择操作,如果实际绑定地址为空,则置空acceptorRef;如果接收连接事件发生,则处理连接请求,遍历接收事件就绪的ServerSocketChannel,ServerSocketChannel创建一个关联processor的会话,初始化会话,添加会话到会话关联io处理器;检查是否有地址解绑请求,如果有解绑请求CancellationRequest,从绑定socket地址与ServerSocketChannle映射map中,移除绑定的socket地址,关闭ServerSocketChannle;最后检查监听器是否正在关闭,如果acceptor正在关闭,则关闭关联processor。Acceptor和AbstractPollingIoAcceptor的关系,与AbstractPollingIoProcessor和Processor的关系很像。地址解绑过程,首先根据解绑地址创建AcceptorOperationFuture,添加到解绑队列,启动Acceptor线程,完成实际解绑工作。
AbstractPollingIoAcceptor所有的工作(地址绑定,接收连接,创建会话,添加会话到IO处理器,解绑地址,释放监听器资源)都是在Acceptor线程里完成。
今天来看一下抽象polling监听器的实现NioSocketAcceptor
/**
 * {@link IoAcceptor} for socket transport (TCP/IP).  This class
 * handles incoming TCP/IP based socket connections.
 *socket监听器,接收socket连接诶。
 * @author [url=http://mina.apache.org]Apache MINA Project[/url]
 */
public final class NioSocketAcceptor extends AbstractPollingIoAcceptor<NioSession, ServerSocketChannel>
implements SocketAcceptor {
    private volatile Selector selector;//选择器
    private volatile SelectorProvider selectorProvider = null;//选择器提供者
}

从上来看socket监听,有两个内部变量为选择器selector和选择器提供selectorProvider。
再来看构造:
 /**
  * Constructor for {@link NioSocketAcceptor} using default parameters 
  (multiple thread model).
  默认参数,多线程模型
  */
 public NioSocketAcceptor() {
     super(new DefaultSocketSessionConfig(), NioProcessor.class);
     ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
 }

 /**
  * Constructor for {@link NioSocketAcceptor} using default parameters, and
  * given number of {@link NioProcessor} for multithreading I/O operations.
  * 使用默认参数构造,创建processorCount个NioProcessor处理器线程,多线程处理IO操作
  * @param processorCount the number of processor to create and place in a
  * {@link SimpleIoProcessorPool}
  */
 public NioSocketAcceptor(int processorCount) {
     super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount);
     ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
 }

 /**
  *  Constructor for {@link NioSocketAcceptor} with default configuration but a
  *  specific {@link IoProcessor}, useful for sharing the same processor over multiple
  *  {@link IoService} of the same type.
  多服务共享同一个处理器processor,
  * @param processor the processor to use for managing I/O events
  */
 public NioSocketAcceptor(IoProcessor<NioSession> processor) {
     super(new DefaultSocketSessionConfig(), processor);
     ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
 }

 /**
  *  Constructor for {@link NioSocketAcceptor} with a given {@link Executor} for handling
  *  connection events and a given {@link IoProcessor} for handling I/O events, useful for
  *  sharing the same processor and executor over multiple {@link IoService} of the same type.
  多服务共享同一个处理器processor,用给定执行器处理连接事件和,给定的处理器处理IO事件。
  * @param executor the executor for connection
  * @param processor the processor for I/O operations
  */
 public NioSocketAcceptor(Executor executor, IoProcessor<NioSession> processor) {
     super(new DefaultSocketSessionConfig(), executor, processor);
     ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
 }

 /**
  * Constructor for {@link NioSocketAcceptor} using default parameters, and
  * given number of {@link NioProcessor} for multithreading I/O operations, and
  * a custom SelectorProvider for NIO
  *多处理器处理多线程的IO操作
  * @param processorCount the number of processor to create and place in a
  * @param selectorProvider teh SelectorProvider to use
  * {@link SimpleIoProcessorPool}
  */
 public NioSocketAcceptor(int processorCount, SelectorProvider selectorProvider) {
     super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount, selectorProvider);
     ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
     this.selectorProvider = selectorProvider;
 }

上面几个构造方法与AbstractPollingIoAcceptor基本相同。
再来看其他操作:
   /**
     * {@inheritDoc}
     */
    @Override
    protected void init() throws Exception {
        selector = Selector.open();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    protected void init(SelectorProvider selectorProvider) throws Exception {
        this.selectorProvider = selectorProvider;

        if (selectorProvider == null) {
            selector = Selector.open();
        } else {
            selector = selectorProvider.openSelector();
        }
    }

从上面两个方法来看,init方法主要工作为打开一个选择器selector。
  
 /**
     * {@inheritDoc}
     */
    @Override
    protected void destroy() throws Exception {
        if (selector != null) {
            selector.close();
        }
    }
    /**
     * {@inheritDoc}
     */
    public TransportMetadata getTransportMetadata() {
        return NioSocketSession.METADATA;
    }
   /**
     * {@inheritDoc}
     */
    @Override
    public InetSocketAddress getLocalAddress() {
        return (InetSocketAddress) super.getLocalAddress();
    }

    /**
     * {@inheritDoc}
     */
    @Override
    public InetSocketAddress getDefaultLocalAddress() {
        return (InetSocketAddress) super.getDefaultLocalAddress();
    }

    /**
     * {@inheritDoc}
     */
    public void setDefaultLocalAddress(InetSocketAddress localAddress) {
        setDefaultLocalAddress((SocketAddress) localAddress);
    }

来看打开一个socket地址:
   /**
     * {@inheritDoc}
     */
    @Override
    protected ServerSocketChannel open(SocketAddress localAddress) throws Exception {
        // Creates the listening ServerSocket
        ServerSocketChannel channel = null;
	//如果选择器提供者不为空,则通过选择器提供者打开一个ServerSocketChannel,否则
	//通过ServerSocketChannel打开一个socket通道服务者。
        if (selectorProvider != null) {
            channel = selectorProvider.openServerSocketChannel();
        } else {
            channel = ServerSocketChannel.open();
        }
        boolean success = false;
        try {
	    //配置通道阻塞模式,及通道关联的SeverSocket的地址重用配置,然后通过SeverSocket绑定地址
            // This is a non blocking socket channel
            channel.configureBlocking(false);
            // Configure the server socket,
            ServerSocket socket = channel.socket();
            // Set the reuseAddress flag accordingly with the setting
            socket.setReuseAddress(isReuseAddress());
            // and bind.
            try {
                socket.bind(localAddress, getBacklog());
            } catch (IOException ioe) {
                // Add some info regarding the address we try to bind to the
                // message
                String newMessage = "Error while binding on " + localAddress + "\n" + "original message : "
                        + ioe.getMessage();
                Exception e = new IOException(newMessage);
                e.initCause(ioe.getCause());

                // And close the channel
                channel.close();

                throw e;
            }
            // Register the channel within the selector for ACCEPT event
            channel.register(selector, SelectionKey.OP_ACCEPT);
            success = true;
        } finally {
            if (!success) {
                close(channel);
            }
        }
        return channel;
    }

从上来看打开一个socket地址,如果选择器提供者不为空,则通过选择器提供者打开一个ServerSocketChannel,否则通过ServerSocketChannel打开一个socket通道服务者;配置通道阻塞模式,及通道关联的SeverSocket的地址重用配置,然后通过SeverSocket绑定地址。
来看接收连接:
   /**
     * {@inheritDoc}
     */
    @Override
    protected NioSession accept(IoProcessor<NioSession> processor, ServerSocketChannel handle) throws Exception {
        SelectionKey key = null;
	//处理serversocketHandler待处理的选择key
        if (handle != null) {
            key = handle.keyFor(selector);
        }
        if ((key == null) || (!key.isValid()) || (!key.isAcceptable())) {
            return null;
        }
        // accept the connection from the client
	//接收连接
        SocketChannel ch = handle.accept();
        if (ch == null) {
            return null;
        }
        return new NioSocketSession(this, processor, ch);
    }

从上面来看监听器接受连接,实际上是委托给绑定地址的ServerSocketChannel,接受客户端的连接,产生一个SocketChannel,再根据SocketChannel和Io处理器创建会话。
再来看选择操作:
 
  /**
     * Check if we have at least one key whose corresponding channels is
     * ready for I/O operations.
     *检查通道是否有选择key已就绪读写操作
     * This method performs a blocking selection operation.
     * It returns only after at least one channel is selected,
     * this selector's wakeup method is invoked, or the current thread
     * is interrupted, whichever comes first.
     * 
     * @return The number of keys having their ready-operation set updated
     * @throws IOException If an I/O error occurs
     * @throws ClosedSelectorException If this selector is closed
     */
    @Override
    protected int select() throws Exception {
        return selector.select();
    }

从上来看选择操作实际委托给内部选择器。
再来看其他方法
/**
 * {@inheritDoc}
 获取serversocket通道本地地址
 */
@Override
protected SocketAddress localAddress(ServerSocketChannel handle) throws Exception {
    return handle.socket().getLocalSocketAddress();
}
*
 * {@inheritDoc}
 关闭serversocket通道
 */
@Override
protected void close(ServerSocketChannel handle) throws Exception {
    SelectionKey key = handle.keyFor(selector);
    if (key != null) {
        key.cancel();
    }
    handle.close();
}

/**
 * {@inheritDoc}
 唤醒选择器
 */
@Override
protected void wakeup() {
    selector.wakeup();
}
 /**
  * {@inheritDoc}
  获取注册到选择器的ServerSocketChannel
  */
 @Override
 protected Iterator<ServerSocketChannel> selectedHandles() {
     return new ServerSocketChannelIterator(selector.selectedKeys());
 }

 /**
  * Defines an iterator for the selected-key Set returned by the
  * selector.selectedKeys(). It replaces the SelectionKey operator.
  */
 private static class ServerSocketChannelIterator implements Iterator<ServerSocketChannel> {
     /** The selected-key iterator */
     private final Iterator<SelectionKey> iterator;

     /**
      * Build a SocketChannel iterator which will return a SocketChannel instead of
      * a SelectionKey.
      * 
      * @param selectedKeys The selector selected-key set
      */
     private ServerSocketChannelIterator(Collection<SelectionKey> selectedKeys) {
         iterator = selectedKeys.iterator();
     }

     /**
      * Tells if there are more SockectChannel left in the iterator
      * @return <tt>true</tt> if there is at least one more
      * SockectChannel object to read
      */
     public boolean hasNext() {
         return iterator.hasNext();
     }

     /**
      * Get the next SocketChannel in the operator we have built from
      * the selected-key et for this selector.
      * 
      * @return The next SocketChannel in the iterator
      */
     public ServerSocketChannel next() {
         SelectionKey key = iterator.next();

         if (key.isValid() && key.isAcceptable()) {
             return (ServerSocketChannel) key.channel();
         }

         return null;
     }

     /**
      * Remove the current SocketChannel from the iterator
      */
     public void remove() {
         iterator.remove();
     }
}

总结:
socket监听NioSocketAcceptor,有两个内部变量为选择器selector和选择器提供者selectorProvider。init方法主要工作为打开一个选择器selector。打开一个socket地址,如果选择器提供者不为空,则通过选择器提供者打开一个ServerSocketChannel,否则
通过ServerSocketChannel打开一个socket通道服务者;配置通道阻塞模式,及通道关联的SeverSocket的地址重用配置,然后通过SeverSocket绑定地址。监听器接受连接,实际上是委托给绑定地址的ServerSocketChannel,接受客户端的连接,产生一个SocketChannel,再根据SocketChannel和Io处理器创建会话。选择唤醒操作实际委托给内部选择器。


附:
//SocketAcceptor
/**
 * {@link IoAcceptor} for socket transport (TCP/IP).  This class
 * handles incoming TCP/IP based socket connections.
 *
 * @author [url=http://mina.apache.org]Apache MINA Project[/url]
 */
public interface SocketAcceptor extends IoAcceptor {
    /**
     * @return the local InetSocketAddress which is bound currently.  If more than one
     * address are bound, only one of them will be returned, but it's not
     * necessarily the firstly bound address.
     * This method overrides the {@link IoAcceptor#getLocalAddress()} method.
     */
    @Override
    InetSocketAddress getLocalAddress();

    /**
     * @return a {@link Set} of the local InetSocketAddress which are bound currently.
     * This method overrides the {@link IoAcceptor#getDefaultLocalAddress()} method.
     */
    @Override
    InetSocketAddress getDefaultLocalAddress();

    /**
     * Sets the default local InetSocketAddress to bind when no argument is specified in
     * {@link #bind()} method. Please note that the default will not be used
     * if any local InetSocketAddress is specified.
     * This method overrides the {@link IoAcceptor#setDefaultLocalAddress(java.net.SocketAddress)} method.
     * 
     * @param localAddress The local address
     */
    void setDefaultLocalAddress(InetSocketAddress localAddress);

    /**
     * @see ServerSocket#getReuseAddress()
     * 
     * @return <tt>true</tt> if the <tt>SO_REUSEADDR</tt> is enabled
     */
    boolean isReuseAddress();

    /**
     * @see ServerSocket#setReuseAddress(boolean)
     * 
     * @param reuseAddress tells if the <tt>SO_REUSEADDR</tt> is to be enabled
     */
    void setReuseAddress(boolean reuseAddress);

    /**
     * @return the size of the backlog.
     */
    int getBacklog();

    /**
     * Sets the size of the backlog.  This can only be done when this
     * class is not bound
     * 
     * @param backlog The backlog's size
     */
    void setBacklog(int backlog);

    /**
     * @return the default configuration of the new SocketSessions created by 
     * this acceptor service.
     */
    @Override
    SocketSessionConfig getSessionConfig();
}
相关标签: mina