Mina Io监听器接口定义及抽象实现
程序员文章站
2022-03-11 09:30:28
...
Mina IoService接口定义及抽象实现:http://donald-draper.iteye.com/blog/2378271
引言:
上面我们看了IoService接口的定义及抽象实现,先来回顾一下:
抽象service关联一个IoHandler处理会话相关事件,关联一个执行器Executor,负责处理io事件的执行,一个会话配置IOsessionConfig,用于service创建会话时,配置会话,一个过滤链构建器IoFilterChainBuilder,用于构建会话的过滤链,会话数据结构工厂,用于创建会话的属性Map和写请求队列,还有service监听器和统计器。抽象service构造,首先检查会话配置和传输元数据,会话配置必须传输元数据的会话配置类型必须相同,即socket(TCP),会话配置为socketSessionConfig,报文通信(UDP),为DatagramSessionConfig;然后将会话创建监听器serviceActivationListener添加监听器管理器IoServiceListenerSupport;初始化会话配置,IO事件执行器executor和异常监视器。初始化会话就是将service会话数据结构工厂的会话属性添加到具体的会话中,将service会话数据结构工厂的写请求队列,设置到具体的会话中,如果是连接请求会话,则将连接结果添加会话属性中。
今天我们我们来看一下IoService的一个分支IoAcceptor:
从IoAcceptor的接口定义可以看,IoAcceptor是将IoService添加了监听连接请求功能。
再来看抽象实现:
来看一下地址绑定操作:
绑定方法有以下几点要看
从上面可以看出,绑定地址首先要检查绑定的socket地址与传输元数据的地址类型是否相同,相同则通过bindInternal完成实际的绑定,然后通知Service监听器,Service已激活fireServiceActivated。
再来看地址解绑:
上面所有的解绑方法实际通过unbind(Iterable<? extends SocketAddress> localAddresses)方法完成:
从解绑地址方法来看,主要是委托unbind0方法完成实际解绑工作,清空绑定地址集合boundAddresses,触发Service监听器无效事件fireServiceDeactivated。
再来看其他方法,下面这些方法就不讲了,都是set和get操作看看就行:
总结;
IoAcceptor与IoService不同的是,添加了监听连接请求和地址绑定功能。抽象Io监听器AbstractIoAcceptor绑定地址首先要检查绑定的socket地址与传输元数据的地址类型是否相同,相同则通过bindInternal完成实际的绑定,然后通知Service监听器,Service已激活fireServiceActivated。解绑地址方法,主要是委托unbind0方法完成实际解绑工作,清空绑定地址集合boundAddresses,触发Service监听器无效事件fireServiceDeactivated。
引言:
上面我们看了IoService接口的定义及抽象实现,先来回顾一下:
抽象service关联一个IoHandler处理会话相关事件,关联一个执行器Executor,负责处理io事件的执行,一个会话配置IOsessionConfig,用于service创建会话时,配置会话,一个过滤链构建器IoFilterChainBuilder,用于构建会话的过滤链,会话数据结构工厂,用于创建会话的属性Map和写请求队列,还有service监听器和统计器。抽象service构造,首先检查会话配置和传输元数据,会话配置必须传输元数据的会话配置类型必须相同,即socket(TCP),会话配置为socketSessionConfig,报文通信(UDP),为DatagramSessionConfig;然后将会话创建监听器serviceActivationListener添加监听器管理器IoServiceListenerSupport;初始化会话配置,IO事件执行器executor和异常监视器。初始化会话就是将service会话数据结构工厂的会话属性添加到具体的会话中,将service会话数据结构工厂的写请求队列,设置到具体的会话中,如果是连接请求会话,则将连接结果添加会话属性中。
今天我们我们来看一下IoService的一个分支IoAcceptor:
/** * Accepts incoming connection, communicates with clients, and fires events to * {@link IoHandler}s. 接收连接请求,与客户端通信,触发IoHandler相关事件。 * <p> * Please refer to * [url=../../../../../xref-examples/org/apache/mina/examples/echoserver/Main.html]EchoServer[/url] * example. * <p> * You should bind to the desired socket address to accept incoming * connections, and then events for incoming connections will be sent to * the specified default {@link IoHandler}. * <p> 你应该绑定socket地址,以接收连接请求,连接请求事件将会发送个默认的IoHandler。 * Threads accept incoming connections start automatically when * {@link #bind()} is invoked, and stop when {@link #unbind()} is invoked. *当IoAcceptor绑定地址时,相关线程将自动开始接收连接请求,当调用#unbind方法,将停止 线程接收连接请求 * @author [url=http://mina.apache.org]Apache MINA Project[/url] */ public interface IoAcceptor extends IoService { /** * Returns the local address which is bound currently. If more than one * address are bound, only one of them will be returned, but it's not * necessarily the firstly bound address. * 返回当前绑定本地地址,当绑定多个地址时,返回其中一个,不需要非是第一个绑定的地址 * @return The bound LocalAddress */ SocketAddress getLocalAddress(); /** * Returns a {@link Set} of the local addresses which are bound currently. * 返回绑定的地址集 * @return The Set of bound LocalAddresses */ Set<SocketAddress> getLocalAddresses(); /** * Returns the default local address to bind when no argument is specified * in {@link #bind()} method. Please note that the default will not be * used if any local address is specified. If more than one address are * set, only one of them will be returned, but it's not necessarily the * firstly specified address in {@link #setDefaultLocalAddresses(List)}. * 当bind无参方法被调用时,返回一个默认的本地地址。 * @return The default bound LocalAddress */ SocketAddress getDefaultLocalAddress(); /** * Returns a {@link List} of the default local addresses to bind when no * argument is specified in {@link #bind()} method. Please note that the * default will not be used if any local address is specified. * 返回默认地址集 * @return The list of default bound LocalAddresses */ List<SocketAddress> getDefaultLocalAddresses(); /** * Sets the default local address to bind when no argument is specified in * {@link #bind()} method. Please note that the default will not be used * if any local address is specified. *设置默认socket地址 * @param localAddress The local addresses to bind the acceptor on */ void setDefaultLocalAddress(SocketAddress localAddress); /** * Sets the default local addresses to bind when no argument is specified * in {@link #bind()} method. Please note that the default will not be * used if any local address is specified. 设置默认地址集 * @param firstLocalAddress The first local address to bind the acceptor on * @param otherLocalAddresses The other local addresses to bind the acceptor on */ void setDefaultLocalAddresses(SocketAddress firstLocalAddress, SocketAddress... otherLocalAddresses); /** * Sets the default local addresses to bind when no argument is specified * in {@link #bind()} method. Please note that the default will not be * used if any local address is specified. * * @param localAddresses The local addresses to bind the acceptor on */ void setDefaultLocalAddresses(Iterable<? extends SocketAddress> localAddresses); /** * Sets the default local addresses to bind when no argument is specified * in {@link #bind()} method. Please note that the default will not be * used if any local address is specified. * * @param localAddresses The local addresses to bind the acceptor on */ void setDefaultLocalAddresses(List<? extends SocketAddress> localAddresses); /** * Returns <tt>true</tt> if and only if all clients are closed when this * acceptor unbinds from all the related local address (i.e. when the * service is deactivated). 当且仅当所有客户端关闭,acceptor解绑所有相关本地地址 * * @return <tt>true</tt> if the service sets the closeOnDeactivation flag */ boolean isCloseOnDeactivation(); /** * Sets whether all client sessions are closed when this acceptor unbinds * from all the related local addresses (i.e. when the service is * deactivated). The default value is <tt>true</tt>. * 无论所有客户端关闭是否,acceptor是否解绑所有相关本地地址,设置closeOnDeactivation * @param closeOnDeactivation <tt>true</tt> if we should close on deactivation */ void setCloseOnDeactivation(boolean closeOnDeactivation); /** * Binds to the default local address(es) and start to accept incoming * connections. *绑定默认的socket本地地址,接收连接请求 * @throws IOException if failed to bind */ void bind() throws IOException; /** * Binds to the specified local address and start to accept incoming * connections. *绑定socket本地地址,接收连接请求 * @param localAddress The SocketAddress to bind to * * @throws IOException if failed to bind */ void bind(SocketAddress localAddress) throws IOException; /** * Binds to the specified local addresses and start to accept incoming * connections. If no address is given, bind on the default local address. * 绑定socket本地地址,接收连接请求,如果没给给定参数,绑定默认地址 * @param firstLocalAddress The first address to bind to * @param addresses The SocketAddresses to bind to * * @throws IOException if failed to bind */ void bind(SocketAddress firstLocalAddress, SocketAddress... addresses) throws IOException; /** * Binds to the specified local addresses and start to accept incoming * connections. If no address is given, bind on the default local address. * 绑定socket本地地址,接收连接请求,如果没给给定参数,绑定默认地址 * @param addresses The SocketAddresses to bind to * * @throws IOException if failed to bind */ void bind(SocketAddress... addresses) throws IOException; /** * Binds to the specified local addresses and start to accept incoming * connections. *绑定socket本地地址,接收连接请求,如果没给给定参数,绑定默认地址 * @param localAddresses The local address we will be bound to * @throws IOException if failed to bind */ void bind(Iterable<? extends SocketAddress> localAddresses) throws IOException; /** * Unbinds from all local addresses that this service is bound to and stops * to accept incoming connections. All managed connections will be closed * if {@link #setCloseOnDeactivation(boolean) disconnectOnUnbind} property * is <tt>true</tt>. This method returns silently if no local address is * bound yet. 解绑service绑定的所有本地socket地址,停止接收,连接请求。如果CloseOnDeactivation参数为 解绑时,断开连接,则关闭service管理的所有会话。 */ void unbind(); /** * Unbinds from the specified local address and stop to accept incoming * connections. All managed connections will be closed if * {@link #setCloseOnDeactivation(boolean) disconnectOnUnbind} property is * <tt>true</tt>. This method returns silently if the default local * address is not bound yet. * 解绑service绑定的所有本地socket地址,停止接收,连接请求。如果CloseOnDeactivation参数为 解绑时,断开连接,则关闭service管理的所有会话。 * @param localAddress The local address we will be unbound from */ void unbind(SocketAddress localAddress); /** * Unbinds from the specified local addresses and stop to accept incoming * connections. All managed connections will be closed if * {@link #setCloseOnDeactivation(boolean) disconnectOnUnbind} property is * <tt>true</tt>. This method returns silently if the default local * addresses are not bound yet. * 解绑service绑定的所有本地socket地址,停止接收,连接请求。如果CloseOnDeactivation参数为 解绑时,断开连接,则关闭service管理的所有会话。 * @param firstLocalAddress The first local address to be unbound from * @param otherLocalAddresses The other local address to be unbound from */ void unbind(SocketAddress firstLocalAddress, SocketAddress... otherLocalAddresses); /** * Unbinds from the specified local addresses and stop to accept incoming * connections. All managed connections will be closed if * {@link #setCloseOnDeactivation(boolean) disconnectOnUnbind} property is * <tt>true</tt>. This method returns silently if the default local * addresses are not bound yet. * 解绑service绑定的所有本地socket地址,停止接收,连接请求。如果CloseOnDeactivation参数为 解绑时,断开连接,则关闭service管理的所有会话。 * @param localAddresses The local address we will be unbound from */ void unbind(Iterable<? extends SocketAddress> localAddresses); /** * (Optional) Returns an {@link IoSession} that is bound to the specified * <tt>localAddress</tt> and the specified <tt>remoteAddress</tt> which * reuses the local address that is already bound by this service. 返回绑定本地地址和远端socket地址的会话,本地地址,service已绑定,并可以重用 * <p> * This operation is optional. Please throw {@link UnsupportedOperationException} * if the transport type doesn't support this operation. This operation is * usually implemented for connectionless transport types. *操作时可选的,如果transport不支持此操作,则可抛出UnsupportedOperationException, 此操作一般报文通信会实现 * @param remoteAddress The remote address bound to the service * @param localAddress The local address the session will be bound to * @throws UnsupportedOperationException if this operation is not supported * @throws IllegalStateException if this service is not running. * @throws IllegalArgumentException if this service is not bound to the * specified <tt>localAddress</tt>. * @return The session bound to the the given localAddress and remote address */ IoSession newSession(SocketAddress remoteAddress, SocketAddress localAddress); }
从IoAcceptor的接口定义可以看,IoAcceptor是将IoService添加了监听连接请求功能。
再来看抽象实现:
/** * A base implementation of {@link IoAcceptor}. * * @author [url=http://mina.apache.org]Apache MINA Project[/url] * @org.apache.xbean.XBean */ public abstract class AbstractIoAcceptor extends AbstractIoService implements IoAcceptor { private final List<SocketAddress> defaultLocalAddresses = new ArrayList<>();//默认绑定的socket地址集 private final List<SocketAddress> unmodifiableDefaultLocalAddresses = Collections .unmodifiableList(defaultLocalAddresses);//包装默认的地址集为不可修改集合 private final Set<SocketAddress> boundAddresses = new HashSet<>();//绑定地址集 private boolean disconnectOnUnbind = true; /** * The lock object which is acquired while bind or unbind operation is performed. * Acquire this lock in your property setters which shouldn't be changed while * the service is bound. */ protected final Object bindLock = new Object(); /** * Constructor for {@link AbstractIoAcceptor}. You need to provide a default * session configuration and an {@link Executor} for handling I/O events. If * null {@link Executor} is provided, a default one will be created using * {@link Executors#newCachedThreadPool()}. * 根据会话配置和执行器构造抽象Io接收器 * @see AbstractIoService#AbstractIoService(IoSessionConfig, Executor) * * @param sessionConfig * the default configuration for the managed {@link IoSession} * @param executor * the {@link Executor} used for handling execution of I/O * events. Can be <code>null</code>. */ protected AbstractIoAcceptor(IoSessionConfig sessionConfig, Executor executor) { super(sessionConfig, executor); defaultLocalAddresses.add(null); } }
来看一下地址绑定操作:
/** * {@inheritDoc} */ @Override public final void bind() throws IOException { bind(getDefaultLocalAddresses()); } /** * {@inheritDoc} */ @Override public final void bind(SocketAddress localAddress) throws IOException { if (localAddress == null) { throw new IllegalArgumentException("localAddress"); } List<SocketAddress> localAddresses = new ArrayList<>(1); localAddresses.add(localAddress); bind(localAddresses); } /** * {@inheritDoc} */ @Override public final void bind(SocketAddress... addresses) throws IOException { if ((addresses == null) || (addresses.length == 0)) { bind(getDefaultLocalAddresses()); return; } List<SocketAddress> localAddresses = new ArrayList<>(2); for (SocketAddress address : addresses) { localAddresses.add(address); } bind(localAddresses); } /** * {@inheritDoc} */ @Override public final void bind(SocketAddress firstLocalAddress, SocketAddress... addresses) throws IOException { if (firstLocalAddress == null) { bind(getDefaultLocalAddresses()); } if ((addresses == null) || (addresses.length == 0)) { bind(getDefaultLocalAddresses()); return; } List<SocketAddress> localAddresses = new ArrayList<>(2); localAddresses.add(firstLocalAddress); for (SocketAddress address : addresses) { localAddresses.add(address); } bind(localAddresses); } 以上的绑定方法,实际由bind(Iterable<? extends SocketAddress> localAddresses)完成。
/** * {@inheritDoc} */ @Override public final void bind(Iterable<? extends SocketAddress> localAddresses) throws IOException { if (isDisposing()) { throw new IllegalStateException("The Accpetor disposed is being disposed."); } if (localAddresses == null) { throw new IllegalArgumentException("localAddresses"); } List<SocketAddress> localAddressesCopy = new ArrayList<>(); for (SocketAddress a : localAddresses) { checkAddressType(a);//检查地址类 localAddressesCopy.add(a); } if (localAddressesCopy.isEmpty()) { throw new IllegalArgumentException("localAddresses is empty."); } boolean activate = false; //同步绑定锁和地址绑定集合 synchronized (bindLock) { synchronized (boundAddresses) { if (boundAddresses.isEmpty()) { activate = true; } } if (getHandler() == null) { throw new IllegalStateException("handler is not set."); } try { //完成实际绑定 Set<SocketAddress> addresses = bindInternal(localAddressesCopy); synchronized (boundAddresses) { //将绑定地址添加到绑定集合boundAddresses boundAddresses.addAll(addresses); } } catch (IOException | RuntimeException e) { throw e; } catch (Exception e) { throw new RuntimeIoException("Failed to bind to: " + getLocalAddresses(), e); } } if (activate) { //通知Service监听器,Service已激活 getListeners().fireServiceActivated(); } }
绑定方法有以下几点要看
1. checkAddressType(a);//检查地址类
private void checkAddressType(SocketAddress a) { //检查绑定的socket地址与传输元数据的地址类型是否相同 if (a != null && !getTransportMetadata().getAddressType().isAssignableFrom(a.getClass())) { throw new IllegalArgumentException("localAddress type: " + a.getClass().getSimpleName() + " (expected: " + getTransportMetadata().getAddressType().getSimpleName() + ")"); } }
2. //完成实际绑定 Set<SocketAddress> addresses = bindInternal(localAddressesCopy);
/** * Starts the acceptor, and register the given addresses * 待子类扩展,启动监听器,注册地址 * @param localAddresses The address to bind to * @return the {@link Set} of the local addresses which is bound actually * @throws Exception If the bind failed */ protected abstract Set<SocketAddress> bindInternal(List<? extends SocketAddress> localAddresses) throws Exception;
从上面可以看出,绑定地址首先要检查绑定的socket地址与传输元数据的地址类型是否相同,相同则通过bindInternal完成实际的绑定,然后通知Service监听器,Service已激活fireServiceActivated。
再来看地址解绑:
/** * {@inheritDoc} */ @Override public final void unbind() { unbind(getLocalAddresses()); } /** * {@inheritDoc} */ @Override public final void unbind(SocketAddress localAddress) { if (localAddress == null) { throw new IllegalArgumentException("localAddress"); } List<SocketAddress> localAddresses = new ArrayList<>(1); localAddresses.add(localAddress); unbind(localAddresses); } /** * {@inheritDoc} */ @Override public final void unbind(SocketAddress firstLocalAddress, SocketAddress... otherLocalAddresses) { if (firstLocalAddress == null) { throw new IllegalArgumentException("firstLocalAddress"); } if (otherLocalAddresses == null) { throw new IllegalArgumentException("otherLocalAddresses"); } List<SocketAddress> localAddresses = new ArrayList<>(); localAddresses.add(firstLocalAddress); Collections.addAll(localAddresses, otherLocalAddresses); unbind(localAddresses); }
上面所有的解绑方法实际通过unbind(Iterable<? extends SocketAddress> localAddresses)方法完成:
/** * {@inheritDoc} */ @Override public final void unbind(Iterable<? extends SocketAddress> localAddresses) { if (localAddresses == null) { throw new IllegalArgumentException("localAddresses"); } boolean deactivate = false; //同步绑定锁和绑定地址集合 synchronized (bindLock) { synchronized (boundAddresses) { if (boundAddresses.isEmpty()) { return; } List<SocketAddress> localAddressesCopy = new ArrayList<>(); int specifiedAddressCount = 0; for (SocketAddress a : localAddresses) { specifiedAddressCount++; if ((a != null) && boundAddresses.contains(a)) { localAddressesCopy.add(a); } } if (specifiedAddressCount == 0) { throw new IllegalArgumentException("localAddresses is empty."); } if (!localAddressesCopy.isEmpty()) { try { unbind0(localAddressesCopy); } catch (RuntimeException e) { throw e; } catch (Exception e) { throw new RuntimeIoException("Failed to unbind from: " + getLocalAddresses(), e); } //清空绑定地址集合 boundAddresses.removeAll(localAddressesCopy); if (boundAddresses.isEmpty()) { deactivate = true; } } } } if (deactivate) { //触发Service监听器无效事件 getListeners().fireServiceDeactivated(); } }
/** * Implement this method to perform the actual unbind operation. * 待子类实现,具体解绑 * @param localAddresses The address to unbind from * @throws Exception If the unbind failed */ protected abstract void unbind0(List<? extends SocketAddress> localAddresses) throws Exception;
从解绑地址方法来看,主要是委托unbind0方法完成实际解绑工作,清空绑定地址集合boundAddresses,触发Service监听器无效事件fireServiceDeactivated。
再来看其他方法,下面这些方法就不讲了,都是set和get操作看看就行:
/** * {@inheritDoc} */ @Override public SocketAddress getLocalAddress() { Set<SocketAddress> localAddresses = getLocalAddresses(); if (localAddresses.isEmpty()) { return null; } return localAddresses.iterator().next(); } /** * {@inheritDoc} */ @Override public final Set<SocketAddress> getLocalAddresses() { Set<SocketAddress> localAddresses = new HashSet<>(); synchronized (boundAddresses) { localAddresses.addAll(boundAddresses); } return localAddresses; } /** * {@inheritDoc} */ @Override public SocketAddress getDefaultLocalAddress() { if (defaultLocalAddresses.isEmpty()) { return null; } return defaultLocalAddresses.iterator().next(); } /** * {@inheritDoc} */ @Override public final void setDefaultLocalAddress(SocketAddress localAddress) { setDefaultLocalAddresses(localAddress); } /** * {@inheritDoc} */ @Override public final List<SocketAddress> getDefaultLocalAddresses() { return unmodifiableDefaultLocalAddresses; } /** * {@inheritDoc} * @org.apache.xbean.Property nestedType="java.net.SocketAddress" */ @Override public final void setDefaultLocalAddresses(List<? extends SocketAddress> localAddresses) { if (localAddresses == null) { throw new IllegalArgumentException("localAddresses"); } setDefaultLocalAddresses((Iterable<? extends SocketAddress>) localAddresses); } /** * {@inheritDoc} */ @Override public final void setDefaultLocalAddresses(Iterable<? extends SocketAddress> localAddresses) { if (localAddresses == null) { throw new IllegalArgumentException("localAddresses"); } synchronized (bindLock) { synchronized (boundAddresses) { if (!boundAddresses.isEmpty()) { throw new IllegalStateException("localAddress can't be set while the acceptor is bound."); } Collection<SocketAddress> newLocalAddresses = new ArrayList<>(); for (SocketAddress a : localAddresses) { checkAddressType(a); newLocalAddresses.add(a); } if (newLocalAddresses.isEmpty()) { throw new IllegalArgumentException("empty localAddresses"); } this.defaultLocalAddresses.clear(); this.defaultLocalAddresses.addAll(newLocalAddresses); } } } /** * {@inheritDoc} * @org.apache.xbean.Property nestedType="java.net.SocketAddress" */ @Override public final void setDefaultLocalAddresses(SocketAddress firstLocalAddress, SocketAddress... otherLocalAddresses) { if (otherLocalAddresses == null) { otherLocalAddresses = new SocketAddress[0]; } Collection<SocketAddress> newLocalAddresses = new ArrayList<>(otherLocalAddresses.length + 1); newLocalAddresses.add(firstLocalAddress); for (SocketAddress a : otherLocalAddresses) { newLocalAddresses.add(a); } setDefaultLocalAddresses(newLocalAddresses); } /** * {@inheritDoc} */ @Override public final boolean isCloseOnDeactivation() { return disconnectOnUnbind; } /** * {@inheritDoc} */ @Override public final void setCloseOnDeactivation(boolean disconnectClientsOnUnbind) { this.disconnectOnUnbind = disconnectClientsOnUnbind; }
/** * A {@Link IoFuture} 监听操作结果 */ public static class AcceptorOperationFuture extends ServiceOperationFuture { private final List<SocketAddress> localAddresses; /** * Creates a new AcceptorOperationFuture instance * * @param localAddresses The list of local addresses to listen to */ public AcceptorOperationFuture(List<? extends SocketAddress> localAddresses) { this.localAddresses = new ArrayList<>(localAddresses); } /** * @return The list of local addresses we listen to */ public final List<SocketAddress> getLocalAddresses() { return Collections.unmodifiableList(localAddresses); } /** * @see Object#toString() */ @Override public String toString() { StringBuilder sb = new StringBuilder(); sb.append("Acceptor operation : "); if (localAddresses != null) { boolean isFirst = true; for (SocketAddress address : localAddresses) { if (isFirst) { isFirst = false; } else { sb.append(", "); } sb.append(address); } } return sb.toString(); } } }
总结;
IoAcceptor与IoService不同的是,添加了监听连接请求和地址绑定功能。抽象Io监听器AbstractIoAcceptor绑定地址首先要检查绑定的socket地址与传输元数据的地址类型是否相同,相同则通过bindInternal完成实际的绑定,然后通知Service监听器,Service已激活fireServiceActivated。解绑地址方法,主要是委托unbind0方法完成实际解绑工作,清空绑定地址集合boundAddresses,触发Service监听器无效事件fireServiceDeactivated。