Mina 过滤链抽象实现
程序员文章站
2024-01-13 12:45:28
...
Mina 过滤器定义:http://donald-draper.iteye.com/blog/2376161
Mina 日志过滤器与引用计数过滤器:http://donald-draper.iteye.com/blog/2376226
Mina 过滤链默认构建器:http://donald-draper.iteye.com/blog/2375985
引言:
在过滤链默认构建器文章我们看了一下过滤链默认构建器和过滤链的定义,先来回顾一下:
过滤器链IoFilterChain用Entry存放过滤器对,即每个过滤器IoFilter关联一个后继过滤器NextFilter。我们可以通过滤器名name或过滤器实例ioFilter或过滤器类型获取相应的过滤器或过滤器对应的Entry。fireMessage*/exceptionCaught相关方法为触发IoHandler的相关事件,fireFilterWrite/Close触发的是,会话的相关事件IoSession#write/close。
DefaultIoFilterChainBuilder用entries列表(CopyOnWriteArrayList<DefaultIoFilterChainBuilder.EntryImpl>)来管理过滤器;添加过滤器,移除过滤器,及判断是否包含过滤器都是依赖于CopyOnWriteArrayList的相关功能。buildFilterChain方法是将默认过滤器链构建器的过滤器集合中的过滤器添加到指定的过滤链IoFilterChain上。DefaultIoFilterChainBuilder的过滤器EntryImpl中的getNextFilter并没有实际作用,即无效,这就说明了DefaultIoFilterChainBuilder只用于在创建会话时,构建过滤器链。创建完毕后,对过滤器链构建器的修改不会影响到会话实际的过滤器链IoFilterChain(SocketFilterChain,DatagramFilterChain...)。
今天我们来看一下过滤链的抽象实现AbstractIoFilterChain:
从上来看AbstractIoFilterChain内部关联一个IoSession,用EntryImp来包装过滤器,过滤链中
用HashMap<String,EntryImpl>来存放过滤器Entry,key为过滤器名,value为过滤器Entry;
过滤链头为HeadFilter,链尾为TailFilter。
先来看一下过滤器Entry的实现
我们来看一个EntryImpl的后继过滤器NextFilter的传递IoHandler和IoSession事件的方法,我们挑一个来看:
//AbstractIoFilterChain-EntryImpl
//AbstractIoFilterChain
其他相关的IoHandler和IoSession事件处理与此方法相似。
//AbstractIoFilterChain-EntryImpl
//AbstractIoFilterChain
从callPreviousFilterClose方法来看虽然从方法面为向前转递FilterClose事件,实际上是
传给当前过滤器,并转发给其后继过滤器,这个是因为过滤链HashMap<String,EntryImpl>是双向。
这里我们只贴处理其他事件的代码,不多解释
//AbstractIoFilterChain
EntryImpl是过滤器在过滤链上存在的形式,EntryImpl有一个前驱和一个后继,内部包裹一个过滤器 with name,及过滤器的后继过滤器NextFilter。后继过滤器NextFilter的传递IoHandler和IoSession事件的方法,主要是将事件转发给后继Entry对应的过滤器。
再来看一下过滤链头HeadFilter
//HeadFilter
从HeadFilter的定义来看,HeadFilter触发IoHandler和IoSession事件时,将事件传递给后继过滤器;
有两个方法有所不同:
//HeadFilter
//AbstractIoFilterChain,待子类扩展
//HeadFilter
//AbstractIoFilterChain,待子类扩展
从上面来看,HeadFilter触发IoHandler和IoSession事件时,将事件传递给后继过滤器;但对于IoSession write/close事件除了传递事件外,需要调用实际的事件操作doWrite/doClose,这两个方法需要子类扩展实现。
再来看一下过滤链尾为TailFilter
来看一下释放buffer:
从上面来看,TailFilter触发IoHandler和IoSession事件时,直接调用会话处理器IoHandler的相关事件方法。在sessionOpened事件中,最后如果是SocketConnector创建的会话,则要通知相关ConnectFuture;在sessionClosed事件中,最后还要清空过滤链;messageSent和messageReceived事件,如果消息对象为ByteBuffer,则释放buffer。
再来看AbstractIoFilterChain的添加过滤器相关操作:
添加过滤器到链头
从上可以看出添加过滤器到过滤链,首先检查过滤链上是否存在过滤器,不存在,才添加;
添加过滤器到头部即,插入过滤器到链头的后面,添加过滤器到尾部,即插入过滤器到链尾的前面;添加到指定过滤器前后,思路基本相同。
再来看AbstractIoFilterChain移除过滤器操作:
从上可以移除过滤器,首先获取过滤器对应的Entry,然后触发过滤器onPreRemove事件,
从过滤链name2entry移除Entry,然后触发过滤器onPostRemove事件。
再来看其他操作:
在来看触发事件的相关方法:
从上面可以看出,IoHanler的相关事件(Session*)处理的顺序为,从链头到链尾-》Iohanlder(这个过程handler有参数处理相关事件);对于会话相关的事件(FilterWrite/close),处理顺序为Iohanlder-》从链尾到链头(这是会话事件,只是在handler的方法中使用会话发送消息,
handler并不处理会话事件)
总结:
AbstractIoFilterChain内部关联一个IoSession,用EntryImp来包装过滤器,过滤链中用HashMap<String,EntryImpl>来存放过滤器Entry,key为过滤器名,value为过滤器Entry。
EntryImpl是过滤器在过滤链上存在的形式,EntryImpl有一个前驱和一个后继,内部包裹一个过滤器 with name,及过滤器的后继过滤器NextFilter。后继过滤器NextFilter的传递IoHandler和IoSession事件的方法,主要是将事件转发给后继Entry对应的过滤器。过滤链头为HeadFilter,链尾为TailFilter。
HeadFilter触发IoHandler和IoSession事件时,将事件传递给后继过滤器;但对于IoSession write/close事件除了传递事件外,需要调用实际的事件操作doWrite/doClose,这两个方法需要子类扩展实现。
TailFilter触发IoHandler和IoSession事件时,直接调用会话处理器IoHandler的相关事件方法。在sessionOpened事件中,最后如果是SocketConnector创建的会话,则要通知相关ConnectFuture;在sessionClosed事件中,最后还要清空过滤链;messageSent和messageReceived事件,如果消息对象为ByteBuffer,则释放buffer。
添加过滤器到过滤链,首先检查过滤链上是否存在过滤器,不存在,才添加;
添加过滤器到头部即,插入过滤器到链头的后面,添加过滤器到尾部,即插入过滤器到链尾的前面;添加到指定过滤器前后,思路基本相同;添加前触发过滤器onPreAdd事件,添加后触发过滤器onPostAdd事件;移除过滤器,首先获取过滤器对应的Entry,然后触发过滤器onPreRemove事件,从过滤链name2entry移除Entry,然后触发过滤器onPostRemove事件。
过滤链处理相关事件策略为:与IoHanler的相关事件(Session*)处理的顺序为,从链头到链尾-》Iohanlder(这个过程handler处理相关事件);对于会话相关的事件(FilterWrite/close),处理顺序为Iohanlder-》从链尾到链头(这是会话事件,只是在handler的方法中使用会话发送消息,关闭会话,handler并不处理会话事件)
附:
//IoFilterChain
过滤链IoFilterChain的fireMessage*/exceptionCaught相关方法为触发IoHandler的相关事件,fireFilterWrite/Close触发的是,会话的相关事件IoSession#write/close。过滤链IoFilterChain用Entry存放过滤器对,即每个过滤器IoFilter关联一个后继过滤器NextFilter。
Mina 日志过滤器与引用计数过滤器:http://donald-draper.iteye.com/blog/2376226
Mina 过滤链默认构建器:http://donald-draper.iteye.com/blog/2375985
引言:
在过滤链默认构建器文章我们看了一下过滤链默认构建器和过滤链的定义,先来回顾一下:
过滤器链IoFilterChain用Entry存放过滤器对,即每个过滤器IoFilter关联一个后继过滤器NextFilter。我们可以通过滤器名name或过滤器实例ioFilter或过滤器类型获取相应的过滤器或过滤器对应的Entry。fireMessage*/exceptionCaught相关方法为触发IoHandler的相关事件,fireFilterWrite/Close触发的是,会话的相关事件IoSession#write/close。
DefaultIoFilterChainBuilder用entries列表(CopyOnWriteArrayList<DefaultIoFilterChainBuilder.EntryImpl>)来管理过滤器;添加过滤器,移除过滤器,及判断是否包含过滤器都是依赖于CopyOnWriteArrayList的相关功能。buildFilterChain方法是将默认过滤器链构建器的过滤器集合中的过滤器添加到指定的过滤链IoFilterChain上。DefaultIoFilterChainBuilder的过滤器EntryImpl中的getNextFilter并没有实际作用,即无效,这就说明了DefaultIoFilterChainBuilder只用于在创建会话时,构建过滤器链。创建完毕后,对过滤器链构建器的修改不会影响到会话实际的过滤器链IoFilterChain(SocketFilterChain,DatagramFilterChain...)。
今天我们来看一下过滤链的抽象实现AbstractIoFilterChain:
import java.util.ArrayList; import java.util.HashMap; import java.util.Iterator; import java.util.List; import java.util.Map; import org.apache.mina.common.ConnectFuture; import org.apache.mina.common.IdleStatus; import org.apache.mina.common.IoFilter; import org.apache.mina.common.IoFilterAdapter; import org.apache.mina.common.IoFilterChain; import org.apache.mina.common.IoFilterLifeCycleException; import org.apache.mina.common.IoSession; import org.apache.mina.common.IoFilter.NextFilter; import org.apache.mina.common.IoFilter.WriteRequest; import org.apache.mina.util.ByteBufferUtil; import org.apache.mina.util.SessionLog; /** * An abstract implementation of {@link IoFilterChain} that provides * common operations for developers to implement their own transport layer. * <p> AbstractIoFilterChain未过滤链的抽象实现,为开发者提供了传输层的一般操作。 开发者仅需要实现#doWrite方法,当过滤器拦截到会话发送消息,则调用此方法。 * The only method a developer should implement is * {@link #doWrite(IoSession, IoFilter.WriteRequest)}. This method is invoked * when filter chain is evaluated for * {@link IoFilter#filterWrite(NextFilter, IoSession, IoFilter.WriteRequest)} and * finally to be written out. * * @author The Apache Directory Project (mina-dev@directory.apache.org) * @version $Rev$, $Date$ */ public abstract class AbstractIoFilterChain implements IoFilterChain { /** * A session attribute that stores a {@link ConnectFuture} related with * the {@link IoSession}. {@link AbstractIoFilterChain} clears this * attribute and notifies the future when {@link #fireSessionOpened(IoSession)} * or {@link #fireExceptionCaught(IoSession, Throwable)} is invoked */ public static final String CONNECT_FUTURE = AbstractIoFilterChain.class .getName() + ".connectFuture"; private final IoSession session;//过滤链关联会话 //HashMap<String,EntryImpl>,key为过滤器名,value为过滤器Entry private final Map name2entry = new HashMap(); private final EntryImpl head;//过滤链头 private final EntryImpl tail;//过滤链尾 protected AbstractIoFilterChain(IoSession session) { if (session == null) { throw new NullPointerException("session"); } //初始化Session this.session = session; //初始化过滤链头为HeadFilter EntryImpl head = new EntryImpl(null, null, "head", new HeadFilter()); //初始化过滤链尾为TailFilter EntryImpl tail = new EntryImpl(head, null, "tail", new TailFilter()); head.nextEntry = tail; } }
从上来看AbstractIoFilterChain内部关联一个IoSession,用EntryImp来包装过滤器,过滤链中
用HashMap<String,EntryImpl>来存放过滤器Entry,key为过滤器名,value为过滤器Entry;
过滤链头为HeadFilter,链尾为TailFilter。
先来看一下过滤器Entry的实现
private class EntryImpl implements Entry { private EntryImpl prevEntry;//前驱 private EntryImpl nextEntry;//后继 private final String name;//过滤器名 private final IoFilter filter;//关联过滤器 private final NextFilter nextFilter;//过滤器后继 private EntryImpl(EntryImpl prevEntry, EntryImpl nextEntry, String name, IoFilter filter) { if (filter == null) { throw new NullPointerException("filter"); } if (name == null) { throw new NullPointerException("name"); } this.prevEntry = prevEntry; this.nextEntry = nextEntry; this.name = name; this.filter = filter; this.nextFilter = new NextFilter() { public void sessionCreated(IoSession session) { Entry nextEntry = EntryImpl.this.nextEntry; callNextSessionCreated(nextEntry, session); } public void sessionOpened(IoSession session) { Entry nextEntry = EntryImpl.this.nextEntry; callNextSessionOpened(nextEntry, session); } public void sessionClosed(IoSession session) { Entry nextEntry = EntryImpl.this.nextEntry; callNextSessionClosed(nextEntry, session); } public void sessionIdle(IoSession session, IdleStatus status) { Entry nextEntry = EntryImpl.this.nextEntry; callNextSessionIdle(nextEntry, session, status); } public void exceptionCaught(IoSession session, Throwable cause) { Entry nextEntry = EntryImpl.this.nextEntry; callNextExceptionCaught(nextEntry, session, cause); } public void messageReceived(IoSession session, Object message) { Entry nextEntry = EntryImpl.this.nextEntry; callNextMessageReceived(nextEntry, session, message); } public void messageSent(IoSession session, Object message) { Entry nextEntry = EntryImpl.this.nextEntry; callNextMessageSent(nextEntry, session, message); } public void filterWrite(IoSession session, WriteRequest writeRequest) { Entry nextEntry = EntryImpl.this.prevEntry; callPreviousFilterWrite(nextEntry, session, writeRequest); } public void filterClose(IoSession session) { Entry nextEntry = EntryImpl.this.prevEntry; callPreviousFilterClose(nextEntry, session); } }; } public String getName() { return name; } public IoFilter getFilter() { return filter; } public NextFilter getNextFilter() { return nextFilter; } public String toString() { return "(" + getName() + ':' + filter + ')'; } }
我们来看一个EntryImpl的后继过滤器NextFilter的传递IoHandler和IoSession事件的方法,我们挑一个来看:
//AbstractIoFilterChain-EntryImpl
//会话创建事件 public void sessionCreated(IoSession session) { //后去当前Entry的后继Entry Entry nextEntry = EntryImpl.this.nextEntry; //将会话创建时间传递给Entry对应的过滤器 callNextSessionCreated(nextEntry, session); }
//AbstractIoFilterChain
//将会话创建时间传递给Entry对应的过滤器 private void callNextSessionCreated(Entry entry, IoSession session) { try { entry.getFilter().sessionCreated(entry.getNextFilter(), session); } catch (Throwable e) { fireExceptionCaught(session, e); } }
其他相关的IoHandler和IoSession事件处理与此方法相似。
//AbstractIoFilterChain-EntryImpl
public void filterClose(IoSession session) { Entry nextEntry = EntryImpl.this.prevEntry; callPreviousFilterClose(nextEntry, session); }
//AbstractIoFilterChain
private void callPreviousFilterClose(Entry entry, IoSession session) { try { entry.getFilter().filterClose(entry.getNextFilter(), session); } catch (Throwable e) { fireExceptionCaught(session, e); } }
从callPreviousFilterClose方法来看虽然从方法面为向前转递FilterClose事件,实际上是
传给当前过滤器,并转发给其后继过滤器,这个是因为过滤链HashMap<String,EntryImpl>是双向。
这里我们只贴处理其他事件的代码,不多解释
//AbstractIoFilterChain
private void callNextSessionOpened(Entry entry, IoSession session) { try { entry.getFilter().sessionOpened(entry.getNextFilter(), session); } catch (Throwable e) { fireExceptionCaught(session, e); } } private void callNextSessionClosed(Entry entry, IoSession session) { try { entry.getFilter().sessionClosed(entry.getNextFilter(), session); } catch (Throwable e) { fireExceptionCaught(session, e); } } private void callNextSessionIdle(Entry entry, IoSession session, IdleStatus status) { try { entry.getFilter().sessionIdle(entry.getNextFilter(), session, status); } catch (Throwable e) { fireExceptionCaught(session, e); } } private void callNextMessageReceived(Entry entry, IoSession session, Object message) { try { entry.getFilter().messageReceived(entry.getNextFilter(), session, message); } catch (Throwable e) { fireExceptionCaught(session, e); } } private void callNextMessageSent(Entry entry, IoSession session, Object message) { try { entry.getFilter().messageSent(entry.getNextFilter(), session, message); } catch (Throwable e) { fireExceptionCaught(session, e); } } private void callNextExceptionCaught(Entry entry, IoSession session, Throwable cause) { try { entry.getFilter().exceptionCaught(entry.getNextFilter(), session, cause); } catch (Throwable e) { SessionLog.warn(session, "Unexpected exception from exceptionCaught handler.", e); } } private void callPreviousFilterWrite(Entry entry, IoSession session, WriteRequest writeRequest) { try { entry.getFilter().filterWrite(entry.getNextFilter(), session, writeRequest); } catch (Throwable e) { writeRequest.getFuture().setWritten(false); fireExceptionCaught(session, e); } }
EntryImpl是过滤器在过滤链上存在的形式,EntryImpl有一个前驱和一个后继,内部包裹一个过滤器 with name,及过滤器的后继过滤器NextFilter。后继过滤器NextFilter的传递IoHandler和IoSession事件的方法,主要是将事件转发给后继Entry对应的过滤器。
再来看一下过滤链头HeadFilter
//HeadFilter
private class HeadFilter extends IoFilterAdapter { public void sessionCreated(NextFilter nextFilter, IoSession session) { nextFilter.sessionCreated(session); } public void sessionOpened(NextFilter nextFilter, IoSession session) { nextFilter.sessionOpened(session); } public void sessionClosed(NextFilter nextFilter, IoSession session) { nextFilter.sessionClosed(session); } public void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) { nextFilter.sessionIdle(session, status); } public void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) { nextFilter.exceptionCaught(session, cause); } public void messageReceived(NextFilter nextFilter, IoSession session, Object message) { nextFilter.messageReceived(session, message); } public void messageSent(NextFilter nextFilter, IoSession session, Object message) { nextFilter.messageSent(session, message); } public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { if (session.getTransportType().getEnvelopeType().isAssignableFrom( writeRequest.getMessage().getClass())) { doWrite(session, writeRequest); } else { throw new IllegalStateException( "Write requests must be transformed to " + session.getTransportType().getEnvelopeType() + ": " + writeRequest); } } public void filterClose(NextFilter nextFilter, IoSession session) throws Exception { doClose(session); } }
从HeadFilter的定义来看,HeadFilter触发IoHandler和IoSession事件时,将事件传递给后继过滤器;
有两个方法有所不同:
//HeadFilter
//会话写操作 public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { if (session.getTransportType().getEnvelopeType().isAssignableFrom( writeRequest.getMessage().getClass())) { doWrite(session, writeRequest); } else { throw new IllegalStateException( "Write requests must be transformed to " + session.getTransportType().getEnvelopeType() + ": " + writeRequest); } }
//AbstractIoFilterChain,待子类扩展
protected abstract void doWrite(IoSession session, WriteRequest writeRequest) throws Exception;
//HeadFilter
//会话关闭 public void filterClose(NextFilter nextFilter, IoSession session) throws Exception { doClose(session); }
//AbstractIoFilterChain,待子类扩展
protected abstract void doClose(IoSession session) throws Exception;
从上面来看,HeadFilter触发IoHandler和IoSession事件时,将事件传递给后继过滤器;但对于IoSession write/close事件除了传递事件外,需要调用实际的事件操作doWrite/doClose,这两个方法需要子类扩展实现。
再来看一下过滤链尾为TailFilter
private static class TailFilter extends IoFilterAdapter { public void sessionCreated(NextFilter nextFilter, IoSession session) throws Exception { //直接调用会话处理器IoHandler的sessionCreated session.getHandler().sessionCreated(session); } public void sessionOpened(NextFilter nextFilter, IoSession session) throws Exception { try { session.getHandler().sessionOpened(session); } finally { // Notify the related ConnectFuture // if the session is created from SocketConnector. //如果是SocketConnector创建的会话,则通知相关ConnectFuture ConnectFuture future = (ConnectFuture) session .removeAttribute(CONNECT_FUTURE); if (future != null) { future.setSession(session); } } } public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception { try { session.getHandler().sessionClosed(session); } finally { // Remove all filters.会话关闭,清除过滤链 session.getFilterChain().clear(); } } public void sessionIdle(NextFilter nextFilter, IoSession session, IdleStatus status) throws Exception { session.getHandler().sessionIdle(session, status); } public void exceptionCaught(NextFilter nextFilter, IoSession session, Throwable cause) throws Exception { session.getHandler().exceptionCaught(session, cause); } public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception { try { session.getHandler().messageReceived(session, message); } finally { //如果发送消息对象为ByteBuffer,则释放buffer ByteBufferUtil.releaseIfPossible(message); } } public void messageSent(NextFilter nextFilter, IoSession session, Object message) throws Exception { try { session.getHandler().messageSent(session, message); } finally { //如果发送消息对象为ByteBuffer,则释放buffer ByteBufferUtil.releaseIfPossible(message); } } public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { nextFilter.filterWrite(session, writeRequest); } public void filterClose(NextFilter nextFilter, IoSession session) throws Exception { nextFilter.filterClose(session); } }
来看一下释放buffer:
package org.apache.mina.util; import org.apache.mina.common.ByteBuffer; /** * ByteBuffer utility. * * @author The Apache Directory Project (mina-dev@directory.apache.org) * @version $Rev$, $Date$ */ public class ByteBufferUtil { /** * Increases the internal reference count of this buffer to defer * automatic release. You have to invoke {@link #release()} as many * as you invoked this method to release this buffer. 记录buffer内部引用的计数器自增 * */ public static void acquireIfPossible(Object message) { if (message instanceof ByteBuffer) { ((ByteBuffer) message).acquire(); } } //如果消息对象为ByteBuffer,则释放空间 public static void releaseIfPossible(Object message) { if (message instanceof ByteBuffer) { ((ByteBuffer) message).release(); } } private ByteBufferUtil() { } }
从上面来看,TailFilter触发IoHandler和IoSession事件时,直接调用会话处理器IoHandler的相关事件方法。在sessionOpened事件中,最后如果是SocketConnector创建的会话,则要通知相关ConnectFuture;在sessionClosed事件中,最后还要清空过滤链;messageSent和messageReceived事件,如果消息对象为ByteBuffer,则释放buffer。
再来看AbstractIoFilterChain的添加过滤器相关操作:
添加过滤器到链头
public synchronized void addFirst(String name, IoFilter filter) { checkAddable(name);//检查过滤器在过滤链上是否存在 register(head, name, filter); } /** * Checks the specified filter name is already taken and throws an exception if already taken. //检查过滤器在过滤链上是否存在 */ private void checkAddable(String name) { if (name2entry.containsKey(name)) { throw new IllegalArgumentException( "Other filter is using the same name '" + name + "'"); } } private void register(EntryImpl prevEntry, String name, IoFilter filter) { //根据filter的前驱prevEntry和后继prevEntry.nextEntry,构造EntryImpl EntryImpl newEntry = new EntryImpl(prevEntry, prevEntry.nextEntry, name, filter); try { //触发过滤器onPreAdd事件 filter.onPreAdd(this, name, newEntry.getNextFilter()); } catch (Exception e) { throw new IoFilterLifeCycleException("onPreAdd(): " + name + ':' + filter + " in " + getSession(), e); } prevEntry.nextEntry.prevEntry = newEntry; prevEntry.nextEntry = newEntry; //添加过滤器name与Entry映射到过滤链 name2entry.put(name, newEntry); try { //触发过滤器onPostAdd事件 filter.onPostAdd(this, name, newEntry.getNextFilter()); } catch (Exception e) { deregister0(newEntry); throw new IoFilterLifeCycleException("onPostAdd(): " + name + ':' + filter + " in " + getSession(), e); } } //添加过滤器到链尾 public synchronized void addLast(String name, IoFilter filter) { checkAddable(name); register(tail.prevEntry, name, filter); } //添加过滤器到baseName过滤器的前面 public synchronized void addBefore(String baseName, String name, IoFilter filter) { EntryImpl baseEntry = checkOldName(baseName);//获取baseName对应的过滤器 checkAddable(name);//检查过滤器是否存在 register(baseEntry.prevEntry, name, filter);//注册到过滤链上 } /** * Throws an exception when the specified filter name is not registered in this chain. *获取baseName对应的过滤器 * @return An filter entry with the specified name. */ private EntryImpl checkOldName(String baseName) { EntryImpl e = (EntryImpl) name2entry.get(baseName); if (e == null) { throw new IllegalArgumentException("Unknown filter name:" + baseName); } return e; } //添加过滤器到baseName过滤器的后面 public synchronized void addAfter(String baseName, String name, IoFilter filter) { EntryImpl baseEntry = checkOldName(baseName); checkAddable(name); register(baseEntry, name, filter); }
从上可以看出添加过滤器到过滤链,首先检查过滤链上是否存在过滤器,不存在,才添加;
添加过滤器到头部即,插入过滤器到链头的后面,添加过滤器到尾部,即插入过滤器到链尾的前面;添加到指定过滤器前后,思路基本相同。
再来看AbstractIoFilterChain移除过滤器操作:
public synchronized IoFilter remove(String name) { EntryImpl entry = checkOldName(name); deregister(entry);//反注册name过滤器对应的Entry return entry.getFilter(); } //反注册过滤器Entry private void deregister(EntryImpl entry) { IoFilter filter = entry.getFilter(); try { //触发过滤器onPreRemove事件 filter.onPreRemove(this, entry.getName(), entry.getNextFilter()); } catch (Exception e) { throw new IoFilterLifeCycleException("onPreRemove(): " + entry.getName() + ':' + filter + " in " + getSession(), e); } //委托给deregister0完成实际的移除工作 deregister0(entry); try { //触发过滤器onPostRemove事件 filter.onPostRemove(this, entry.getName(), entry.getNextFilter()); } catch (Exception e) { throw new IoFilterLifeCycleException("onPostRemove(): " + entry.getName() + ':' + filter + " in " + getSession(), e); } } private void deregister0(EntryImpl entry) { EntryImpl prevEntry = entry.prevEntry; EntryImpl nextEntry = entry.nextEntry; prevEntry.nextEntry = nextEntry; nextEntry.prevEntry = prevEntry; name2entry.remove(entry.name); }
从上可以移除过滤器,首先获取过滤器对应的Entry,然后触发过滤器onPreRemove事件,
从过滤链name2entry移除Entry,然后触发过滤器onPostRemove事件。
再来看其他操作:
//清空过滤链 public synchronized void clear() throws Exception { //遍历过滤链,移除过滤器 Iterator it = new ArrayList(name2entry.keySet()).iterator(); while (it.hasNext()) { this.remove((String) it.next()); } } //获取过滤链依附Io会话 public IoSession getSession() { return session; } //获取name对应的过滤器Entry public Entry getEntry(String name) { Entry e = (Entry) name2entry.get(name); if (e == null) { return null; } return e; } //获取name对应的过滤器 public IoFilter get(String name) { Entry e = getEntry(name); if (e == null) { return null; } return e.getFilter(); } //获取name对应的过滤器后继NextFilter public NextFilter getNextFilter(String name) { Entry e = getEntry(name); if (e == null) { return null; } return e.getNextFilter(); } //获取过滤链上的所有过滤器(正序,添加顺序),从头到尾 public List getAll() { List list = new ArrayList(); EntryImpl e = head.nextEntry; while (e != tail) { list.add(e); e = e.nextEntry; } return list; } //获取过滤链上的所有过滤器(逆序),从尾到头 public List getAllReversed() { List list = new ArrayList(); EntryImpl e = tail.prevEntry; while (e != head) { list.add(e); e = e.prevEntry; } return list; } //判断是否包含name过滤器 public boolean contains(String name) { return getEntry(name) != null; } //判断是否包含过滤器filter public boolean contains(IoFilter filter) { EntryImpl e = head.nextEntry; while (e != tail) { if (e.getFilter() == filter) { return true; } e = e.nextEntry; } return false; } //判断是否包含指定类型filterType的过滤器 public boolean contains(Class filterType) { EntryImpl e = head.nextEntry; while (e != tail) { if (filterType.isAssignableFrom(e.getFilter().getClass())) { return true; } e = e.nextEntry; } return false; }
在来看触发事件的相关方法:
//从链头传递SessionCreated public void fireSessionCreated(IoSession session) { Entry head = this.head; callNextSessionCreated(head, session); } public void fireSessionOpened(IoSession session) { Entry head = this.head; callNextSessionOpened(head, session); } public void fireSessionIdle(IoSession session, IdleStatus status) { Entry head = this.head; callNextSessionIdle(head, session, status); } //消息接收,从链头到链尾-》Iohanlder(这个过程handler有参数处理相关事件) public void fireMessageReceived(IoSession session, Object message) { Entry head = this.head; callNextMessageReceived(head, session, message); } public void fireMessageSent(IoSession session, WriteRequest request) { try { //写回写请求结果,及消息已发送 request.getFuture().setWritten(true); } catch (Throwable t) { fireExceptionCaught(session, t); } Entry head = this.head; callNextMessageSent(head, session, request.getMessage()); } //移除发生,如果是SocketConnector重建的会话,则从会话中移除CONNECT_FUTURE属性,并设置Future异常 public void fireExceptionCaught(IoSession session, Throwable cause) { // Notify the related ConnectFuture // if the session is created from SocketConnector. ConnectFuture future = (ConnectFuture) session .removeAttribute(CONNECT_FUTURE); if (future == null) { Entry head = this.head; callNextExceptionCaught(head, session, cause); } else { // Please note that this place is not the only place that // calls ConnectFuture.setException(). future.setException(cause); } } public void fireSessionClosed(IoSession session) { // Update future. try { //通过会话的closeFuture,会话已经关闭 session.getCloseFuture().setClosed(); } catch (Throwable t) { fireExceptionCaught(session, t); } // And start the chain. Entry head = this.head; callNextSessionClosed(head, session); } //消息发送,Iohanlder-》从链尾到链头(这是会话事件,只是在handler的方法中使用会话发 //送消息,handler并不处理会话事件) public void fireFilterWrite(IoSession session, WriteRequest writeRequest) { Entry tail = this.tail; callPreviousFilterWrite(tail, session, writeRequest); } //触发会话关闭事件从链尾到链头- public void fireFilterClose(IoSession session) { Entry tail = this.tail; callPreviousFilterClose(tail, session); }
从上面可以看出,IoHanler的相关事件(Session*)处理的顺序为,从链头到链尾-》Iohanlder(这个过程handler有参数处理相关事件);对于会话相关的事件(FilterWrite/close),处理顺序为Iohanlder-》从链尾到链头(这是会话事件,只是在handler的方法中使用会话发送消息,
handler并不处理会话事件)
总结:
AbstractIoFilterChain内部关联一个IoSession,用EntryImp来包装过滤器,过滤链中用HashMap<String,EntryImpl>来存放过滤器Entry,key为过滤器名,value为过滤器Entry。
EntryImpl是过滤器在过滤链上存在的形式,EntryImpl有一个前驱和一个后继,内部包裹一个过滤器 with name,及过滤器的后继过滤器NextFilter。后继过滤器NextFilter的传递IoHandler和IoSession事件的方法,主要是将事件转发给后继Entry对应的过滤器。过滤链头为HeadFilter,链尾为TailFilter。
HeadFilter触发IoHandler和IoSession事件时,将事件传递给后继过滤器;但对于IoSession write/close事件除了传递事件外,需要调用实际的事件操作doWrite/doClose,这两个方法需要子类扩展实现。
TailFilter触发IoHandler和IoSession事件时,直接调用会话处理器IoHandler的相关事件方法。在sessionOpened事件中,最后如果是SocketConnector创建的会话,则要通知相关ConnectFuture;在sessionClosed事件中,最后还要清空过滤链;messageSent和messageReceived事件,如果消息对象为ByteBuffer,则释放buffer。
添加过滤器到过滤链,首先检查过滤链上是否存在过滤器,不存在,才添加;
添加过滤器到头部即,插入过滤器到链头的后面,添加过滤器到尾部,即插入过滤器到链尾的前面;添加到指定过滤器前后,思路基本相同;添加前触发过滤器onPreAdd事件,添加后触发过滤器onPostAdd事件;移除过滤器,首先获取过滤器对应的Entry,然后触发过滤器onPreRemove事件,从过滤链name2entry移除Entry,然后触发过滤器onPostRemove事件。
过滤链处理相关事件策略为:与IoHanler的相关事件(Session*)处理的顺序为,从链头到链尾-》Iohanlder(这个过程handler处理相关事件);对于会话相关的事件(FilterWrite/close),处理顺序为Iohanlder-》从链尾到链头(这是会话事件,只是在handler的方法中使用会话发送消息,关闭会话,handler并不处理会话事件)
附:
//IoFilterChain
/** * A container of {@link IoFilter}s that forwards {@link IoHandler} events * to the consisting filters and terminal {@link IoHandler} sequentially. * Every {@link IoSession} has its own {@link IoFilterChain} (1-to-1 relationship). * IoFilterChain是IoFilter的容器,用于转发IoHandler的事件到包含过滤器和Io处理器的链。 (IoService->IoProcessor->IoFilter->IoFilter->...->IoHandler) */ public interface IoFilterChain { /** * Represents a name-filter pair that an {@link IoFilterChain} contains. * *IoFilterChain包含的IOFilter对 * @author The Apache Directory Project (mina-dev@directory.apache.org) */ public interface Entry { /** * Returns the name of the filter. 过滤器名 */ String getName(); /** * Returns the filter. 当前过滤器 */ IoFilter getFilter(); /** * Returns the {@link NextFilter} of the filter. 过滤器后继 * * @throws IllegalStateException if the {@link NextFilter} is not available */ NextFilter getNextFilter(); } public abstract IoSession getSession();// /** * Returns the parent {@link IoSession} of this chain. 返回过滤链依附的Io会话 * @return {@link IoSession} */ IoSession getSession(); /** * Returns the {@link Entry} with the specified <tt>name</tt> in this chain. * @return <tt>null</tt> if there's no such name in this chain 根据过滤器name获取Entry */ Entry getEntry(String name); /** * Returns the {@link IoFilter} with the specified <tt>name</tt> in this chain. * @return <tt>null</tt> if there's no such name in this chain 根据过滤器name获取IoFilter */ IoFilter get(String name); /** * Returns the {@link NextFilter} of the {@link IoFilter} with the * specified <tt>name</tt> in this chain. * @return <tt>null</tt> if there's no such name in this chain 根据过滤器name获取IoFilter的后继过滤器NextFilter */ NextFilter getNextFilter(String name); /** * Returns the list of all {@link Entry}s this chain contains. 返回所有Entry(添加顺序) */ List getAll(); /** * Returns the reversed list of all {@link Entry}s this chain contains. 获取所有Entry倒序(LIFO) */ List getAllReversed(); /** * Returns <tt>true</tt> if this chain contains an {@link IoFilter} with the * specified <tt>name</tt>. 判断是否包含name对应的IoFilter */ boolean contains(String name); /** * Returns <tt>true</tt> if this chain contains the specified <tt>filter</tt>. 判断是否包含IoFilter类型的实例 */ boolean contains(IoFilter filter); /** * Returns <tt>true</tt> if this chain contains an {@link IoFilter} of the * specified <tt>filterType</tt>. 判断是否包含Class类型的过滤器 */ boolean contains(Class filterType); /** * Adds the specified filter with the specified name at the beginning of this chain. * @throws IoFilterLifeCycleException * if {@link IoFilter#onPostAdd(IoFilterChain, String, NextFilter)} or * {@link IoFilter#init()} throws an exception. 添加过滤器到过滤链的头部 */ void addFirst(String name, IoFilter filter); /** * Adds the specified filter with the specified name at the end of this chain. * @throws IoFilterLifeCycleException * if {@link IoFilter#onPostAdd(IoFilterChain, String, NextFilter)} or * {@link IoFilter#init()} throws an exception. 添加过滤器到过滤链的尾部 */ void addLast(String name, IoFilter filter); /** * Adds the specified filter with the specified name just before the filter whose name is * <code>baseName</code> in this chain. * @throws IoFilterLifeCycleException * if {@link IoFilter#onPostAdd(IoFilterChain, String, NextFilter)} or * {@link IoFilter#init()} throws an exception. 添加过滤器到baseName过滤器的前面 */ void addBefore(String baseName, String name, IoFilter filter); /** * Adds the specified filter with the specified name just after the filter whose name is * <code>baseName</code> in this chain. * @throws IoFilterLifeCycleException * if {@link IoFilter#onPostAdd(IoFilterChain, String, NextFilter)} or * {@link IoFilter#init()} throws an exception. 添加过滤器到baseName过滤器的后面 */ void addAfter(String baseName, String name, IoFilter filter); /** * Removes the filter with the specified name from this chain. * @throws IoFilterLifeCycleException * if {@link IoFilter#onPostRemove(IoFilterChain, String, NextFilter)} or * {@link IoFilter#destroy()} throws an exception. 移除name对应的过滤器 */ IoFilter remove(String name); /** * Removes all filters added to this chain.清空过滤器链 * @throws Exception if {@link IoFilter#onPostRemove(IoFilterChain, String, NextFilter)} thrown an exception. */ void clear() throws Exception; /** * Fires a {@link IoHandler#sessionCreated(IoSession)} event. Most users don't need to * call this method at all. Please use this method only when you implement a new transport * or fire a virtual event. 通知IoHandler#sessionCreated创建事件。用户必须要调用这个方法。仅在实现一个新的transport或 通知一个虚拟事件时,才调用此方法。 */ public void fireSessionCreated(IoSession session); /** * Fires a {@link IoHandler#sessionOpened(IoSession)} event. Most users don't need to call * this method at all. Please use this method only when you implement a new transport or * fire a virtual event. */ public void fireSessionOpened(IoSession session); /** * Fires a {@link IoHandler#sessionClosed(IoSession)} event. Most users don't need to call * this method at all. Please use this method only when you implement a new transport or * fire a virtual event. */ public void fireSessionClosed(IoSession session); /** * Fires a {@link IoHandler#sessionIdle(IoSession, IdleStatus)} event. Most users don't * need to call this method at all. Please use this method only when you implement a new * transport or fire a virtual event. */ public void fireSessionIdle(IoSession session, IdleStatus status); /** * Fires a {@link #fireMessageReceived(IoSession, Object)} event. Most users don't need to * call this method at all. Please use this method only when you implement a new transport * or fire a virtual event. */ public void fireMessageReceived(IoSession session, Object message); /** * Fires a {@link IoHandler#sessionOpened(IoSession)} event. Most users don't need to call * this method at all. Please use this method only when you implement a new transport or * fire a virtual event. */ public void fireMessageSent(IoSession session, WriteRequest request); /** * Fires a {@link IoHandler#exceptionCaught(IoSession, Throwable)} event. Most users don't * need to call this method at all. Please use this method only when you implement a new * transport or fire a virtual event. */ public void fireExceptionCaught(IoSession session, Throwable cause); /** * Fires a {@link IoSession#write(Object)} event. Most users don't need to call this * method at all. Please use this method only when you implement a new transport or fire a * virtual event. 通知IoSession#write事件 */ public void fireFilterWrite(IoSession session, WriteRequest writeRequest); /** * Fires a {@link IoSession#close()} event. Most users don't need to call this method at * all. Please use this method only when you implement a new transport or fire a virtual * event. 通知IoSession#close事件 */ public void fireFilterClose(IoSession session); }
过滤链IoFilterChain的fireMessage*/exceptionCaught相关方法为触发IoHandler的相关事件,fireFilterWrite/Close触发的是,会话的相关事件IoSession#write/close。过滤链IoFilterChain用Entry存放过滤器对,即每个过滤器IoFilter关联一个后继过滤器NextFilter。