欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Mina socket连接器(NioSocketConnector)

程序员文章站 2024-01-13 09:30:58
...
Mina 抽象Polling连接器(AbstractPollingIoConnector):http://donald-draper.iteye.com/blog/2378978
引言:
   上一盘文章我们看了抽象Polling连接器,先来回顾一下:
     抽象拉取连接器内部有一个连接请求队列connectQueue,连接请求取消队列cancelQueue,Io处理器和连接线程引用connectorRef。拉取连接器构造主要初始化会话配置,IO事件执行器和IO处理器。连接操作,首先根据本地socket地址创建SocketChannel,连接远端socket地址,根据IO处理器和SocketChannel构建Io会话,将会话添加到会话关联的IO处理器中,根据SocketChannel和会话初始化sessionInitializer构建连接请求,添加到连接请求队列,最后启动连接器线程。
     连接器线程首先计算选择超时时间,执行超时选择操作,注册连接请求SocketChannel连接事件到选择器;如果没有任何连接请求SocketChannel需要处理,置空连接器连接线程引用,清空连接请求队列,如果有连接请求已经连接完成,即触发SocketChannel兴趣连接事件,处理连接事件就绪的连接请求,这个过程首先调用finishConnect完成SocketChannel连接后续工作,根据Io处理器和SocketChannel创建会话,初始化会话,添加会话到会话关联的IO处理器;然后处理连接超时的连接请求,即设置连接结果超时异常,添加到连接请求到取消队列;处理取消连接的连接请求,即关闭连接请求关联的SocketChannel。
今天我们来看连接器的具体实现NioSocketConnector:
/**
 * {@link IoConnector} for socket transport (TCP/IP).
 *
 * @author [url=http://mina.apache.org]Apache MINA Project[/url]
 */
public final class NioSocketConnector extends AbstractPollingIoConnector<NioSession, SocketChannel> implements
SocketConnector {
    private volatile Selector selector;//选择器
}

从socket连接器来看,选择器selector为内部唯一变量。
/**
 * Constructor for {@link NioSocketConnector} with default configuration (multiple thread model).
 默认构造多线程模型
 */
public NioSocketConnector() {
    super(new DefaultSocketSessionConfig(), NioProcessor.class);
    ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
}

/**
 * Constructor for {@link NioSocketConnector} with default configuration, and
 * given number of {@link NioProcessor} for multithreading I/O operations
 * @param processorCount the number of processor to create and place in a
 * {@link SimpleIoProcessorPool}
 多线程模型,指定Io处理器线程池大小
 */
public NioSocketConnector(int processorCount) {
    super(new DefaultSocketSessionConfig(), NioProcessor.class, processorCount);
    ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
}
/**
 *  Constructor for {@link NioSocketConnector} with default configuration but a
 *  specific {@link IoProcessor}, useful for sharing the same processor over multiple
 *  {@link IoService} of the same type.
 多Io服务共享Io处理器模式构造
 * @param processor the processor to use for managing I/O events
 */
public NioSocketConnector(IoProcessor<NioSession> processor) {
    super(new DefaultSocketSessionConfig(), processor);
    ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
}

/**
 *  Constructor for {@link NioSocketConnector} with a given {@link Executor} for handling
 *  connection events and a given {@link IoProcessor} for handling I/O events, useful for sharing
 *  the same processor and executor over multiple {@link IoService} of the same type.
 * @param executor the executor for connection
 * @param processor the processor for I/O operations
 与上一个方法不同的,添加了Io事件执行器参数executor
 */
public NioSocketConnector(Executor executor, IoProcessor<NioSession> processor) {
    super(new DefaultSocketSessionConfig(), executor, processor);
    ((DefaultSocketSessionConfig) getSessionConfig()).init(this);
}

/**
 * Constructor for {@link NioSocketConnector} with default configuration which will use a built-in
 * thread pool executor to manage the given number of processor instances. The processor class must have
 * a constructor that accepts ExecutorService or Executor as its single argument, or, failing that, a
 * no-arg constructor.
 * 使用内部IO处理器线程池SimpleIoProcessorPool,管理Io处理器实例
 * @param processorClass the processor class.
 * @param processorCount the number of processors to instantiate.
 * @see SimpleIoProcessorPool#SimpleIoProcessorPool(Class, Executor, int, java.nio.channels.spi.SelectorProvider)
 * @since 2.0.0-M4
 */
public NioSocketConnector(Class<? extends IoProcessor<NioSession>> processorClass, int processorCount) {
    super(new DefaultSocketSessionConfig(), processorClass, processorCount);
}

/**
 * Constructor for {@link NioSocketConnector} with default configuration with default configuration which will use a built-in
 * thread pool executor to manage the default number of processor instances. The processor class must have
 * a constructor that accepts ExecutorService or Executor as its single argument, or, failing that, a
 * no-arg constructor. The default number of instances is equal to the number of processor cores
 * in the system, plus one.
 * 多线程IO处理器模式,IO处理器实例默认为系统核心处理器数量+1
 * @param processorClass the processor class.
 * @see SimpleIoProcessorPool#SimpleIoProcessorPool(Class, Executor, int, java.nio.channels.spi.SelectorProvider)
 * @since 2.0.0-M4
 */
public NioSocketConnector(Class<? extends IoProcessor<NioSession>> processorClass) {
    super(new DefaultSocketSessionConfig(), processorClass);
}

上面的构造函数和AbstractPollingIoConnector的构造基本相同。
再来看socket连接器的其他操作:
/**
 * {@inheritDoc}
 初始化,打开一个选择器
 */
@Override
protected void init() throws Exception {
    this.selector = Selector.open();
}
/**                                         
 * {@inheritDoc}     
 销毁socket连接器,关闭选择器
 */                                         
@Override                                   
protected void destroy() throws Exception { 
    if (selector != null) {                 
        selector.close();                   
    }                                       
}                                           
 /**
 * {@inheritDoc}
 连接远端地址,即委托SocketChannel的连接操作
 */
@Override
protected boolean connect(SocketChannel handle, SocketAddress remoteAddress) throws Exception {
    return handle.connect(remoteAddress);
}
/**
 * {@inheritDoc}
 获取socket通道的连接请求
 */
@Override
protected ConnectionRequest getConnectionRequest(SocketChannel handle) {
    //获取通道的选择key
    SelectionKey key = handle.keyFor(selector);
    if ((key == null) || (!key.isValid())) {
        return null;
    }
    //返回选择key附加物
    return (ConnectionRequest) key.attachment();
}
/**
 * {@inheritDoc}
 关闭socket通道
 */
@Override
protected void close(SocketChannel handle) throws Exception {
    //取消通道关联的选择key,关闭通道
    SelectionKey key = handle.keyFor(selector);
    if (key != null) {
        key.cancel();
    }
    handle.close();
}
/**
 * {@inheritDoc}
 完成连接
 */
@Override
protected boolean finishConnect(SocketChannel handle) throws Exception {
    //已完成连接,则取消通道关联的选择key(连接操作)
    if (handle.finishConnect()) {
        SelectionKey key = handle.keyFor(selector);
        if (key != null) {
            key.cancel();
        }
        return true;
    }
    return false;
}

来看创建socket通道
/**
 * {@inheritDoc}
 */
@Override
protected SocketChannel newHandle(SocketAddress localAddress) throws Exception {
   //打开一个socket通道,配置接收缓存区size
    SocketChannel ch = SocketChannel.open();
    int receiveBufferSize = (getSessionConfig()).getReceiveBufferSize();
    if (receiveBufferSize > 65535) {
        ch.socket().setReceiveBufferSize(receiveBufferSize);
    }
    if (localAddress != null) {
        try {
	    //绑定地址
            ch.socket().bind(localAddress);
        } catch (IOException ioe) {
            // Add some info regarding the address we try to bind to the
            // message
            String newMessage = "Error while binding on " + localAddress + "\n" + "original message : "
                    + ioe.getMessage();
            Exception e = new IOException(newMessage);
            e.initCause(ioe.getCause());

            // Preemptively close the channel
            ch.close();
            throw e;
        }
    }
    //配置通道为非阻塞模式
    ch.configureBlocking(false);
    return ch;
}

从创建socket通道来看,首先打开一个socket通道,配置接收缓存区size,绑定本地socket地址,配置通道为非阻塞模式。
/**
 * {@inheritDoc}
 注册socket通道的连接事件到选择器
 */
@Override
protected void register(SocketChannel handle, ConnectionRequest request) throws Exception {
    handle.register(selector, SelectionKey.OP_CONNECT, request);
}
 /**
 * {@inheritDoc}
 创建socket会话
 */
@Override
protected NioSession newSession(IoProcessor<NioSession> processor, SocketChannel handle) {
    return new NioSocketSession(this, processor, handle);
}
 /**
 * {@inheritDoc}
 选择操作
 */
@Override
protected int select(int timeout) throws Exception {
    return selector.select(timeout);
}
/**
 * {@inheritDoc}
 唤醒操作
 */
@Override
protected void wakeup() {
    selector.wakeup();
}

从上面一个方法来看注册操作,即注册socket通道的连接事件到选择器。
选择操作和唤醒操作直接委托给内部选择器。
再来看剩下的操作,很简单,看一下明白,不再讲
/**
 * {@inheritDoc}
 */
@Override
public TransportMetadata getTransportMetadata() {
    return NioSocketSession.METADATA;
}
/**
 * {@inheritDoc}
 */
@Override
public SocketSessionConfig getSessionConfig() {
    return (SocketSessionConfig) sessionConfig;
}
/**
 * {@inheritDoc}
 */
@Override
public InetSocketAddress getDefaultRemoteAddress() {
    return (InetSocketAddress) super.getDefaultRemoteAddress();
}
/**
 * {@inheritDoc}
 */
@Override
public void setDefaultRemoteAddress(InetSocketAddress defaultRemoteAddress) {
    super.setDefaultRemoteAddress(defaultRemoteAddress);
}
/**
 * {@inheritDoc}
 连接操作事件就绪的Socket通道
 */
@Override
protected Iterator<SocketChannel> selectedHandles() {
    return new SocketChannelIterator(selector.selectedKeys());
}

/**
 * {@inheritDoc}
 选择器所有关联通道
 */
@Override
protected Iterator<SocketChannel> allHandles() {
    return new SocketChannelIterator(selector.keys());
}
private static class SocketChannelIterator implements Iterator<SocketChannel> {
    private final Iterator<SelectionKey> i;
    private SocketChannelIterator(Collection<SelectionKey> selectedKeys) {
        this.i = selectedKeys.iterator();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public boolean hasNext() {
        return i.hasNext();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public SocketChannel next() {
        SelectionKey key = i.next();
        return (SocketChannel) key.channel();
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public void remove() {
        i.remove();
    }
}

总结:
socket连接器NioSocketConnector内部关联一个选择器;初始化连接器,为打开一个选择器;连接远端地址,即委托SocketChannel的连接操作;创建socket通道来看,首先打开一个socket通道,配置接收缓存区size,绑定本地socket地址,配置通道为非阻塞模式;注册操作,即注册socket通道的连接事件到选择器。选择操作和唤醒操作直接委托给内部选择器。销毁socket连接器,即关闭选择器。
相关标签: mina