欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Mina 连接器接口定义及抽象实现(IoConnector )

程序员文章站 2022-03-11 09:30:40
...
Mina IoService接口定义及抽象实现:http://donald-draper.iteye.com/blog/2378271
Mina socket监听器(NioSocketAcceptor):http://donald-draper.iteye.com/blog/2378668
引言:
前面一篇文章我们可以socket监听,先来回顾一下:
    socket监听NioSocketAcceptor,有两个内部变量为选择器selector和选择器提供者selectorProvider。init方法主要工作为打开一个选择器selector。打开一个socket地址,如果选择器提供者不为空,则通过选择器提供者打开一个ServerSocketChannel,否则通过ServerSocketChannel打开一个socket通道服务者;配置通道阻塞模式,及通道关联的SeverSocket的地址重用配置,然后通过SeverSocket绑定地址。监听器接受连接,实际上是委托给绑定地址的ServerSocketChannel,接受客户端的连接,产生一个SocketChannel,再根据SocketChannel和Io处理器创建会话。选择唤醒操作实际委托给内部选择器。
从这篇文章开始我们来讲socket连接器NioSocketConnector,先从Io连接器接口定义开始:
/**
 * Connects to endpoint, communicates with the server, and fires events to
 * {@link IoHandler}s.
 连接器IoConnector,可以连接终端与服务通信,触发IoHandler的相关事件。
 * <p>
 * Please refer to
 * [url=../../../../../xref-examples/org/apache/mina/examples/netcat/Main.html]NetCat[/url]
 * example.
 * <p>
 * You should connect to the desired socket address to start communication,
 * and then events for incoming connections will be sent to the specified
 * default {@link IoHandler}.
 在开始通信之前,你应该连接一个Socket地址,建立连接的相关事件将会发送到IoHandler。
 * <p>
 * Threads connect to endpoint start automatically when
 * {@link #connect(SocketAddress)} is invoked, and stop when all
 * connection attempts are finished.
 *当连接远端地址时,线程自动连接远端socket地址,当连接尝试完成时,线程关闭
 * @author [url=http://mina.apache.org]Apache MINA Project[/url]
 */
public interface IoConnector extends IoService {
    /**
     * @return the connect timeout in seconds.  The default value is 1 minute.
     * 获取连接超时时间(s),默认为1分钟
     * @deprecated
     */
    @Deprecated
    int getConnectTimeout();
    /**获取连接超时时间(ms),默认为1分钟
     * @return the connect timeout in milliseconds.  The default value is 1 minute.
     */
    long getConnectTimeoutMillis();
    /**
     * Sets the connect timeout in seconds.  The default value is 1 minute.
     * 设置连接超时时间,单位s
     * @deprecated
     * @param connectTimeout The time out for the connection
     */
    @Deprecated
    void setConnectTimeout(int connectTimeout);
    /**
     * Sets the connect timeout in milliseconds.  The default value is 1 minute.
     * 设置连接超时时间,单位ms
     * @param connectTimeoutInMillis The time out for the connection
     */
    void setConnectTimeoutMillis(long connectTimeoutInMillis);

    /**
     * @return the default remote address to connect to when no argument
     * is specified in {@link #connect()} method.
     获取默认的远程socket地址
     */
    SocketAddress getDefaultRemoteAddress();

    /**
     * Sets the default remote address to connect to when no argument is
     * specified in {@link #connect()} method.
     * 设置默认的远端socket地址
     * @param defaultRemoteAddress The default remote address
     */
    void setDefaultRemoteAddress(SocketAddress defaultRemoteAddress);
    //设置获取本地默认socket地址
    /**
     * @return the default local address
     */
    SocketAddress getDefaultLocalAddress();
    /**
     * Sets the default local address
     * 
     * @param defaultLocalAddress The default local address
     */
    void setDefaultLocalAddress(SocketAddress defaultLocalAddress);

    /**
     * Connects to the {@link #setDefaultRemoteAddress(SocketAddress) default
     * remote address}.
     * 连接到默认远端地址
     * @return the {@link ConnectFuture} instance which is completed when the
     *         connection attempt initiated by this call succeeds or fails.
     * @throws IllegalStateException
     *             if no default remoted address is set.
     */
    ConnectFuture connect();

    /**
     * Connects to the {@link #setDefaultRemoteAddress(SocketAddress) default
     * remote address} and invokes the <code>ioSessionInitializer</code> when
     * the IoSession is created but before {@link IoHandler#sessionCreated(IoSession)}
     * is invoked.  There is [i]no[/i] guarantee that the <code>ioSessionInitializer</code>
     * will be invoked before this method returns.
     * 在连接默认远端地址,当会话创建时,在IoHandler#sessionCreated调用前,调用ioSessionInitializer
     ,初始化会话
     * @param sessionInitializer  the callback to invoke when the {@link IoSession} object is created
     * @return the {@link ConnectFuture} instance which is completed when the
     *         connection attempt initiated by this call succeeds or fails.
     * 
     * @throws IllegalStateException if no default remote address is set.
     */
    ConnectFuture connect(IoSessionInitializer<? extends ConnectFuture> sessionInitializer);

    /**
     * Connects to the specified remote address.
     * 连接到远端socket地址
     * @param remoteAddress The remote address to connect to
     * @return the {@link ConnectFuture} instance which is completed when the
     *         connection attempt initiated by this call succeeds or fails.
     */
    ConnectFuture connect(SocketAddress remoteAddress);

    /**
     * Connects to the specified remote address and invokes
     * the <code>ioSessionInitializer</code> when the IoSession is created but before
     * {@link IoHandler#sessionCreated(IoSession)} is invoked.  There is [i]no[/i]
     * guarantee that the <code>ioSessionInitializer</code> will be invoked before
     * this method returns.
     * 连接远端地址,在会话创建时,在IoHandler#sessionCreated调用前,调用ioSessionInitializer
     ,初始化会话。
     * @param remoteAddress  the remote address to connect to
     * @param sessionInitializer  the callback to invoke when the {@link IoSession} object is created
     * 
     * @return the {@link ConnectFuture} instance which is completed when the
     *         connection attempt initiated by this call succeeds or fails.
     */
    ConnectFuture connect(SocketAddress remoteAddress, IoSessionInitializer<? extends ConnectFuture> sessionInitializer);

    /**
     * Connects to the specified remote address binding to the specified local address.
     *连接远端地址,绑定本地地址
     * @param remoteAddress The remote address to connect
     * @param localAddress The local address to bind
     * 
     * @return the {@link ConnectFuture} instance which is completed when the
     *         connection attempt initiated by this call succeeds or fails.
     */
    ConnectFuture connect(SocketAddress remoteAddress, SocketAddress localAddress);

    /**
     * Connects to the specified remote address binding to the specified local
     * address and and invokes the <code>ioSessionInitializer</code> when the
     * IoSession is created but before {@link IoHandler#sessionCreated(IoSession)}
     * is invoked.  There is [i]no[/i] guarantee that the <code>ioSessionInitializer</code>
     * will be invoked before this method returns.
     * 连接远端地址,绑定本地地址,在IoHandler#sessionCreated调用前,调用ioSessionInitializer
     ,初始化会话。
     * @param remoteAddress  the remote address to connect to
     * @param localAddress  the local interface to bind to
     * @param sessionInitializer  the callback to invoke when the {@link IoSession} object is created
     *
     * @return the {@link ConnectFuture} instance which is completed when the
     *         connection attempt initiated by this call succeeds or fails.
     */
    ConnectFuture connect(SocketAddress remoteAddress, SocketAddress localAddress,
            IoSessionInitializer<? extends ConnectFuture> sessionInitializer);
}

从IoConnector接口定义来看,与Ioservice相比增加了连接功能。
再来看抽象连接器AbstractIoConnector定义:
/**
 * A base implementation of {@link IoConnector}.
 *
 * @author [url=http://mina.apache.org]Apache MINA Project[/url]
 */
public abstract class AbstractIoConnector extends AbstractIoService implements IoConnector {
    /**
     * The minimum timeout value that is supported (in milliseconds).
     */
    private long connectTimeoutCheckInterval = 50L;//连接超时检查间隔
    private long connectTimeoutInMillis = 60 * 1000L; // 1 minute by default,默认连接超时时间
    /** The remote address we are connected to 连接的远端地址*/
    private SocketAddress defaultRemoteAddress;
    /** The local address 本地socket地址*/
    private SocketAddress defaultLocalAddress;
}

再看看构造:
/**
 * Constructor for {@link AbstractIoConnector}. You need to provide a
 * default session configuration and an {@link Executor} for handling I/O
 * events. If null {@link Executor} is provided, a default one will be
 * created using {@link Executors#newCachedThreadPool()}.
 * 构造AbstractIoConnector,需要提供一个会话配置,一个执行器用于处理IO相关事件,
 如果执行器为空,默认为Executors#newCachedThreadPool
 * @see AbstractIoService#AbstractIoService(IoSessionConfig, Executor)
 * 
 * @param sessionConfig
 *            the default configuration for the managed {@link IoSession}
 * @param executor
 *            the {@link Executor} used for handling execution of I/O
 *            events. Can be <code>null</code>.
 */
protected AbstractIoConnector(IoSessionConfig sessionConfig, Executor executor) {
    super(sessionConfig, executor);
}

再来看连接操作:
/**
 * {@inheritDoc}
 */
@Override
public final ConnectFuture connect() {
    SocketAddress remoteAddress = getDefaultRemoteAddress();
    if (remoteAddress == null) {
        throw new IllegalStateException("defaultRemoteAddress is not set.");
    }
    return connect(remoteAddress, null, null);
}
/**
 * {@inheritDoc}
 */
@Override
public ConnectFuture connect(IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
    SocketAddress remoteAddress = getDefaultRemoteAddress();
    if (remoteAddress == null) {
        throw new IllegalStateException("defaultRemoteAddress is not set.");
    }
    return connect(remoteAddress, null, sessionInitializer);
}
/**
 * {@inheritDoc}
 */
@Override
public final ConnectFuture connect(SocketAddress remoteAddress) {
    return connect(remoteAddress, null, null);
}
/**
 * {@inheritDoc}
 */
@Override
public ConnectFuture connect(SocketAddress remoteAddress,
        IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
    return connect(remoteAddress, null, sessionInitializer);
}
/**
 * {@inheritDoc}
 */
@Override
public ConnectFuture connect(SocketAddress remoteAddress, SocketAddress localAddress) {
    return connect(remoteAddress, localAddress, null);
}

上面所有的连接操作都是委托给方法connect(SocketAddress remoteAddress, SocketAddress localAddress,
        IoSessionInitializer<? extends ConnectFuture> sessionInitializer)
我们来看这个方法:
/**
 * {@inheritDoc}
 */
@Override
public final ConnectFuture connect(SocketAddress remoteAddress, SocketAddress localAddress,
        IoSessionInitializer<? extends ConnectFuture> sessionInitializer) {
    //检查连接器状态,检查本地地址与远程地址是否为空已经与传输元数据地址类型是否匹配
    if (isDisposing()) {
        throw new IllegalStateException("The connector is being disposed.");
    }
    if (remoteAddress == null) {
        throw new IllegalArgumentException("remoteAddress");
    }
    if (!getTransportMetadata().getAddressType().isAssignableFrom(remoteAddress.getClass())) {
        throw new IllegalArgumentException("remoteAddress type: " + remoteAddress.getClass() + " (expected: "
                + getTransportMetadata().getAddressType() + ")");
    }
    if (localAddress != null && !getTransportMetadata().getAddressType().isAssignableFrom(localAddress.getClass())) {
        throw new IllegalArgumentException("localAddress type: " + localAddress.getClass() + " (expected: "
                + getTransportMetadata().getAddressType() + ")");
    }
    //如果连接器Iohandler为null,创建一个对会话操作事件不处理的IoHandler
    if (getHandler() == null) {
        if (getSessionConfig().isUseReadOperation()) {
            setHandler(new IoHandler() {
                /**
                 * {@inheritDoc}
                 */
                @Override
                public void exceptionCaught(IoSession session, Throwable cause) throws Exception {
                    // Empty handler
                }
                /**
                 * {@inheritDoc}
                 */
                @Override
                public void messageReceived(IoSession session, Object message) throws Exception {
                    // Empty handler
                }
                /**
                 * {@inheritDoc}
                 */
                @Override
                public void messageSent(IoSession session, Object message) throws Exception {
                    // Empty handler
                }
                /**
                 * {@inheritDoc}
                 */
                @Override
                public void sessionClosed(IoSession session) throws Exception {
                    // Empty handler
                }
                /**
                 * {@inheritDoc}
                 */
                @Override
                public void sessionCreated(IoSession session) throws Exception {
                    // Empty handler
                }
                /**
                 * {@inheritDoc}
                 */
                @Override
                public void sessionIdle(IoSession session, IdleStatus status) throws Exception {
                    // Empty handler
                }
                /**
                 * {@inheritDoc}
                 */
                @Override
                public void sessionOpened(IoSession session) throws Exception {
                    // Empty handler
                }
                /**
                 * {@inheritDoc}
                 */
                @Override
                public void inputClosed(IoSession session) throws Exception {
                    // Empty handler
                }
            });
        } else {
            throw new IllegalStateException("handler is not set.");
        }
    }
    return connect0(remoteAddress, localAddress, sessionInitializer);
}


/**
 * Implement this method to perform the actual connect operation.
 *实现具体的connect0
 * @param remoteAddress The remote address to connect from
 * @param localAddress <tt>null</tt> if no local address is specified
 * @param sessionInitializer The IoSessionInitializer to use when the connection s successful
 * @return The ConnectFuture associated with this asynchronous operation
 * 
 */
protected abstract ConnectFuture connect0(SocketAddress remoteAddress, SocketAddress localAddress,
        IoSessionInitializer<? extends ConnectFuture> sessionInitializer);

从上面可以看出,连接操作,首先检查连接器状态,本地地址与远程地址是否为空已经与传输元数据地址类型是否匹配,如果连接器Iohandler为null,创建一个对会话操作事件不处理的IoHandler,最后将实际连接操作委托给connect0,待子类实现。
再来看其他方法:
 /**
     * Adds required internal attributes and {@link IoFutureListener}s
     * related with event notifications to the specified {@code session}
     * and {@code future}.  Do not call this method directly;
     添加必要的内部属性和结果监听器到会话和future,不要直接调用此方法。
     */
    @Override
    protected final void finishSessionInitialization0(final IoSession session, IoFuture future) {
        // In case that ConnectFuture.cancel() is invoked before
        // setSession() is invoked, add a listener that closes the
        // connection immediately on cancellation.
	//防止在设置会话时,连接请求被取消,添加一个监听器,当连接取消时,
	//关闭会话。
        future.addListener(new IoFutureListener<ConnectFuture>() {
            /**
             * {@inheritDoc}
             */
            @Override
            public void operationComplete(ConnectFuture future) {
                if (future.isCanceled()) {
                    session.closeNow();
                }
            }
        });
    }
    /**
    * @return
     *  The minimum time that this connector can have for a connection
     *  timeout in milliseconds.
     */
    public long getConnectTimeoutCheckInterval() {
        return connectTimeoutCheckInterval;
    }
    /**
     * Sets the timeout for the connection check
     *  
     * @param minimumConnectTimeout The delay we wait before checking the connection
     */
    public void setConnectTimeoutCheckInterval(long minimumConnectTimeout) {
        if (getConnectTimeoutMillis() < minimumConnectTimeout) {
            this.connectTimeoutInMillis = minimumConnectTimeout;
        }
        this.connectTimeoutCheckInterval = minimumConnectTimeout;
    }
    /**
     * @deprecated Take a look at <tt>getConnectTimeoutMillis()</tt>
     */
    @Deprecated
    @Override
    public final int getConnectTimeout() {
        return (int) connectTimeoutInMillis / 1000;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public final long getConnectTimeoutMillis() {
        return connectTimeoutInMillis;
    }
    /**
     * @deprecated
     *  Take a look at <tt>setConnectTimeoutMillis(long)</tt>
     */
    @Deprecated
    @Override
    public final void setConnectTimeout(int connectTimeout) {

        setConnectTimeoutMillis(connectTimeout * 1000L);
    }
    /**
     * Sets the connect timeout value in milliseconds.
     * 
     */
    @Override
    public final void setConnectTimeoutMillis(long connectTimeoutInMillis) {
        if (connectTimeoutInMillis <= connectTimeoutCheckInterval) {
            this.connectTimeoutCheckInterval = connectTimeoutInMillis;
        }
        this.connectTimeoutInMillis = connectTimeoutInMillis;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public SocketAddress getDefaultRemoteAddress() {
        return defaultRemoteAddress;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public final void setDefaultLocalAddress(SocketAddress localAddress) {
        defaultLocalAddress = localAddress;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public final SocketAddress getDefaultLocalAddress() {
        return defaultLocalAddress;
    }
    /**
     * {@inheritDoc}
     */
    @Override
    public final void setDefaultRemoteAddress(SocketAddress defaultRemoteAddress) {
        if (defaultRemoteAddress == null) {
            throw new IllegalArgumentException("defaultRemoteAddress");
        }

        if (!getTransportMetadata().getAddressType().isAssignableFrom(defaultRemoteAddress.getClass())) {
            throw new IllegalArgumentException("defaultRemoteAddress type: " + defaultRemoteAddress.getClass()
                    + " (expected: " + getTransportMetadata().getAddressType() + ")");
        }
        this.defaultRemoteAddress = defaultRemoteAddress;
    }

总结:
IoConnector接口给Ioservice增加了连接功能,可以连接服务端。连接操作,首先检查连接器状态,本地地址与远程地址是否为空已经与传输元数据地址类型是否匹配,如果连接器Iohandler为null,创建一个对会话操作事件不处理的IoHandler,最后将实际连接操作委托给connect0,待子类实现。
附:
//IoSessionInitializer
public interface IoSessionInitializer
{
    public abstract void initializeSession(IoSession iosession, IoFuture iofuture);
}
相关标签: mina