Mina 协议编解码过滤器三(会话write与消息接收过滤)
程序员文章站
2024-01-12 21:58:28
...
Mina 协议编解码过滤器一(协议编解码工厂、协议编码器):
http://donald-draper.iteye.com/blog/2376663
Mina 协议编解码过滤器二(协议解码器):
http://donald-draper.iteye.com/blog/2376679
引言:
前面我们看了协议编解码过滤器的所涉及到的相关概念,先回顾一下:
协议编解码过滤器关联一个协议编解码工厂,协议编解码工厂用于创建协议编码和解码器;协议编码器将上层消息,编码成二级制或特定协议格式的数据,写到协议编码器输出的字节队列中,flush字节队列中的数据(filterWrite)给下一个过滤器。协议解码器将接收到的二级制或特定协议格式的数据,解码成上层消息,存放到协议解码器输出的消息队列,flush将消息队列中的消息传给后继过滤器的messageReceived方法。协议编解码过滤器ProtocolCodecFilter默认的协议编码输出为ProtocolEncoderOutputImpl,协议解码输出为SimpleProtocolDecoderOutput。
结构如下:
今天我们正式进入来分析协议编解码过滤器的实现。
先来看构造:
从上面可以看出,协议编解码过滤器构造,说到底就是初始化协议编码器工厂。
再来看过滤器的相关方法
从onPreAdd方法来看,一个过滤链上不能存在两个协议编解码器,即唯一。
再来看会话发送消息
来看这几点
1.
2.
//传递消息到下一个过滤
//继承字节buffer代理,这个在前面已看过,不在讲
从会话写操作来看session#write(filterWrite),首先从写请求获取消息,如果消息为字节buffer,则直接传给后继过滤器,否则从协议编解码工厂获取协议编码器和协议编码器输出,
协议编码器encode编码消息,写到协议编码器输出字节buffer队列,
然后协议编码器输出flush字节buffer队列。
再来看messageSent,已一看就明白:
再来看接收消息
messageReceived方法有1点要关注,
//这个与协议编码器和编码器输出相似
从上面可以看出,会话接收消息messageReceived,如果消息非字节buffer,则直接传给
后继过滤器,否则获取协议解码器,及协议解码器输出,协议解码器解码字节buffer为上传消息对象,写到协议解码器输出消息队列,最后解码器输出flush消息队列。
再来看会话关闭
来看方法的最后资源释放:
从上面来看会话关闭,主要是解码器解码会话未解码数据,写到解码器输出消息队列,
最后释放编码器,解码器及解码器输出相关资源,flush解码器输出消息队列。
我们来看移除协议编解码过滤器onPostRemove
从上面可以看出,从过滤链移除协议编解码过滤器后,要释放会话编解码器,
及解码输出属性,并释放相关的资源。
总结:
协议编解码过滤器构造,主要是初始化协议编码器工厂。一个过滤链上不能存在两个协议编解码器,即唯一。会话写操作来看session#write(filterWrite),首先从写请求获取消息,如果消息为字节buffer,则直接传给后继过滤器,否则从协议编解码工厂获取协议编码器和协议编码器输出,协议编码器encode编码消息,写到协议编码器输出字节buffer队列,
然后协议编码器输出flush字节buffer队列。会话接收消息messageReceived,如果消息非字节buffer,则直接传给后继过滤器,否则获取协议解码器,及协议解码器输出,协议解码器解码字节buffer为上传消息对象,写到协议解码器输出消息队列,最后解码器输出flush消息队列。会话关闭,主要是解码器解码会话未解码数据,写到解码器输出消息队列,最后从会话移除编码器,解码器及解码器输出属性,释放编码器,解码器及解码器输出相关资源,flush解码器输出消息队列。协议编解码器过滤器从过滤链移除后,要释放会话编解码器,及解码输出属性,并释放相关的资源。
http://donald-draper.iteye.com/blog/2376663
Mina 协议编解码过滤器二(协议解码器):
http://donald-draper.iteye.com/blog/2376679
引言:
前面我们看了协议编解码过滤器的所涉及到的相关概念,先回顾一下:
协议编解码过滤器关联一个协议编解码工厂,协议编解码工厂用于创建协议编码和解码器;协议编码器将上层消息,编码成二级制或特定协议格式的数据,写到协议编码器输出的字节队列中,flush字节队列中的数据(filterWrite)给下一个过滤器。协议解码器将接收到的二级制或特定协议格式的数据,解码成上层消息,存放到协议解码器输出的消息队列,flush将消息队列中的消息传给后继过滤器的messageReceived方法。协议编解码过滤器ProtocolCodecFilter默认的协议编码输出为ProtocolEncoderOutputImpl,协议解码输出为SimpleProtocolDecoderOutput。
结构如下:
ProtocolCodecFilter extends IoFilterAdapter --ProtocolCodecFactory --ProtocolEncoder --ProtocolEncoderOutput(ProtocolEncoderOutputImpl) --ProtocolDecoder --ProtocolDecoderOutput(SimpleProtocolDecoderOutput)
今天我们正式进入来分析协议编解码过滤器的实现。
//ProtocolCodecFilter /** * An {@link IoFilter} which translates binary or protocol specific data into * message object and vice versa using {@link ProtocolCodecFactory}, * {@link ProtocolEncoder}, or {@link ProtocolDecoder}. * * @author The Apache Directory Project (mina-dev@directory.apache.org) * @version $Rev$, $Date$ */ public class ProtocolCodecFilter extends IoFilterAdapter { private static final Logger LOGGER = LoggerFactory.getLogger(org/apache/mina/filter/codec/ProtocolCodecFilter); private static final Class EMPTY_PARAMS[] = new Class[0]; private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]); //编码器属性key private static final AttributeKey ENCODER = new AttributeKey(org/apache/mina/filter/codec/ProtocolCodecFilter, "encoder"); //编码器输出属性key private static final AttributeKey ENCODER_OUT = new AttributeKey(org/apache/mina/filter/codec/ProtocolCodecFilter, "encoderOut"); //解码器属性key private static final AttributeKey DECODER = new AttributeKey(org/apache/mina/filter/codec/ProtocolCodecFilter, "decoder"); //解码器输出属性key private static final AttributeKey DECODER_OUT = new AttributeKey(org/apache/mina/filter/codec/ProtocolCodecFilter, "decoderOut"); private final ProtocolCodecFactory factory;//协议编解码器工厂 }
先来看构造:
//根据协议编解码器构造协议编解码过滤器 public ProtocolCodecFilter(ProtocolCodecFactory factory) { if (factory == null) { throw new NullPointerException("factory"); } this.factory = factory; } //根据协议编码器和解码器构造协议编解码过滤器 public ProtocolCodecFilter(final ProtocolEncoder encoder, final ProtocolDecoder decoder) { if (encoder == null) { throw new NullPointerException("encoder"); } if (decoder == null) { throw new NullPointerException("decoder"); } factory = new ProtocolCodecFactory() { public ProtocolEncoder getEncoder() { return encoder; } public ProtocolDecoder getDecoder() { return decoder; } }; } //根据协议编解码类构造协议编解码过滤器 public ProtocolCodecFilter(final Class encoderClass, final Class decoderClass) { if (encoderClass == null) { throw new NullPointerException("encoderClass"); } if (decoderClass == null) { throw new NullPointerException("decoderClass"); } //如果协议编解码类型参数非ProtocolEncoder,ProtocolDecoder //抛出非法参数异常 if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) { throw new IllegalArgumentException("encoderClass: " + encoderClass.getName()); } if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) { throw new IllegalArgumentException("decoderClass: " + decoderClass.getName()); } //获取协议编解码器无参构造 try { encoderClass.getConstructor(EMPTY_PARAMS); } catch (NoSuchMethodException e) { throw new IllegalArgumentException( "encoderClass doesn't have a public default constructor."); } try { decoderClass.getConstructor(EMPTY_PARAMS); } catch (NoSuchMethodException e) { throw new IllegalArgumentException( "decoderClass doesn't have a public default constructor."); } //根据协议编码器和解码器类型创建协议编解码实例,构造协议编解码工厂 factory = new ProtocolCodecFactory() { public ProtocolEncoder getEncoder() throws Exception { return (ProtocolEncoder) encoderClass.newInstance(); } public ProtocolDecoder getDecoder() throws Exception { return (ProtocolDecoder) decoderClass.newInstance(); } }; }
从上面可以看出,协议编解码过滤器构造,说到底就是初始化协议编码器工厂。
再来看过滤器的相关方法
public void onPreAdd(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { //如果过滤器链已经包含协议编解码过滤器,则抛出非法状态异常 if (parent.contains(ProtocolCodecFilter.class)) { throw new IllegalStateException( "A filter chain cannot contain more than one ProtocolCodecFilter."); } }
从onPreAdd方法来看,一个过滤链上不能存在两个协议编解码器,即唯一。
再来看会话发送消息
public void filterWrite(NextFilter nextFilter, IoSession session, WriteRequest writeRequest) throws Exception { //从写请求获取消息 Object message = writeRequest.getMessage(); //如果为字节buffer,传给后继过滤器 if (message instanceof ByteBuffer) { nextFilter.filterWrite(session, writeRequest); return; } //从会话获取协议编码器,及协议编码输出 ProtocolEncoder encoder = getEncoder(session); ProtocolEncoderOutputImpl encoderOut = getEncoderOut(session, nextFilter, writeRequest); try { //编码器编码消息 encoder.encode(session, message, encoderOut); //编码输出flush消息队列 encoderOut.flush(); //传递消息到下一个过滤 nextFilter.filterWrite(session, new WriteRequest( new MessageByteBuffer(writeRequest.getMessage()), writeRequest.getFuture(), writeRequest.getDestination())); } catch (Throwable t) { ProtocolEncoderException pee; if (t instanceof ProtocolEncoderException) { pee = (ProtocolEncoderException) t; } else { pee = new ProtocolEncoderException(t); } throw pee; } }
来看这几点
1.
//从会话获取协议编码器,及协议编码输出 ProtocolEncoder encoder = getEncoder(session); ProtocolEncoderOutputImpl encoderOut = getEncoderOut(session, nextFilter, writeRequest);
//获取协议编码器,这个很简单不讲了 private ProtocolEncoder getEncoder(IoSession session) throws Exception { ProtocolEncoder encoder = (ProtocolEncoder) session .getAttribute(ENCODER); if (encoder == null) { encoder = factory.getEncoder(); session.setAttribute(ENCODER, encoder); } return encoder; } //获取协议编码器输出 private ProtocolEncoderOutputImpl getEncoderOut(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) { return new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest); }
2.
//传递消息到下一个过滤
nextFilter.filterWrite(session, new WriteRequest( new MessageByteBuffer(writeRequest.getMessage()), writeRequest.getFuture(), writeRequest.getDestination()));
//继承字节buffer代理,这个在前面已看过,不在讲
private static class MessageByteBuffer extends ByteBufferProxy { private final Object message; private MessageByteBuffer(Object message) { super(EMPTY_BUFFER); this.message = message; } public void acquire() { // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message } public void release() { // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message } }
从会话写操作来看session#write(filterWrite),首先从写请求获取消息,如果消息为字节buffer,则直接传给后继过滤器,否则从协议编解码工厂获取协议编码器和协议编码器输出,
协议编码器encode编码消息,写到协议编码器输出字节buffer队列,
然后协议编码器输出flush字节buffer队列。
再来看messageSent,已一看就明白:
public void messageSent(NextFilter nextFilter, IoSession session, Object message) throws Exception { if (message instanceof HiddenByteBuffer) { return; } if (!(message instanceof MessageByteBuffer)) { nextFilter.messageSent(session, message); return; } nextFilter.messageSent(session, ((MessageByteBuffer) message).message); }
再来看接收消息
public void messageReceived(NextFilter nextFilter, IoSession session, Object message) throws Exception { //如果消息非字节buffer,则直接传给后继过滤器 if (!(message instanceof ByteBuffer)) { nextFilter.messageReceived(session, message); return; } ByteBuffer in = (ByteBuffer) message; //如果字节buffer为空,直接返回 if (!in.hasRemaining()) { in.release(); return; } //获取解码器,及解码器输出 ProtocolDecoder decoder = getDecoder(session); ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter); int oldPos = in.position(); try { synchronized (decoderOut) { //解码字节buffer decoder.decode(session, in, decoderOut); } } catch (Throwable t) { ProtocolDecoderException pde; if (t instanceof ProtocolDecoderException) { pde = (ProtocolDecoderException) t; } else { pde = new ProtocolDecoderException(t); } if (pde.getHexdump() == null) { int curPos = in.position(); in.position(oldPos); pde.setHexdump(in.getHexDump()); in.position(curPos); } throw pde; } finally { try { // Release the read buffer. //释放字节buffer in.release(); } finally { //flush解码器输出消息队列 decoderOut.flush(); } } }
messageReceived方法有1点要关注,
//获取解码器,及解码器输出 ProtocolDecoder decoder = getDecoder(session); ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
//这个与协议编码器和编码器输出相似
private ProtocolDecoder getDecoder(IoSession session) throws Exception { ProtocolDecoder decoder = (ProtocolDecoder) session .getAttribute(DECODER); if (decoder == null) { decoder = factory.getDecoder(); session.setAttribute(DECODER, decoder); } return decoder; } private ProtocolDecoderOutput getDecoderOut(IoSession session, NextFilter nextFilter) { ProtocolDecoderOutput out = (ProtocolDecoderOutput) session.getAttribute(DECODER_OUT); if (out == null) { out = new SimpleProtocolDecoderOutput(session, nextFilter); session.setAttribute(DECODER_OUT, out); } return out; }
从上面可以看出,会话接收消息messageReceived,如果消息非字节buffer,则直接传给
后继过滤器,否则获取协议解码器,及协议解码器输出,协议解码器解码字节buffer为上传消息对象,写到协议解码器输出消息队列,最后解码器输出flush消息队列。
再来看会话关闭
public void sessionClosed(NextFilter nextFilter, IoSession session) throws Exception { // Call finishDecode() first when a connection is closed. //获取解码器及解码器输出 ProtocolDecoder decoder = getDecoder(session); ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter); try { //解码器,解码会话未解码数据 decoder.finishDecode(session, decoderOut); } catch (Throwable t) { ProtocolDecoderException pde; if (t instanceof ProtocolDecoderException) { pde = (ProtocolDecoderException) t; } else { pde = new ProtocolDecoderException(t); } throw pde; } finally { // Dispose all. //释放编码器,解码器及解码器输出相关资源,flush解码器输出消息队列 disposeEncoder(session); disposeDecoder(session); disposeDecoderOut(session); decoderOut.flush(); } nextFilter.sessionClosed(session); }
来看方法的最后资源释放:
//释放解码器资源 private void disposeEncoder(IoSession session) { //移除会将编码器属性 ProtocolEncoder encoder = (ProtocolEncoder) session .removeAttribute(ENCODER); if (encoder == null) { return; } try { //释放会话编码器相关资源 encoder.dispose(session); } catch (Throwable t) { SessionLog.warn(session, "Failed to dispose: " + encoder.getClass().getName() + " (" + encoder + ')'); } } //从会话移除解码器属性,释放会话解码器资源 private void disposeDecoder(IoSession session) { ProtocolDecoder decoder = (ProtocolDecoder) session .removeAttribute(DECODER); if (decoder == null) { return; } try { decoder.dispose(session); } catch (Throwable t) { SessionLog.warn(session, "Falied to dispose: " + decoder.getClass().getName() + " (" + decoder + ')'); } } //从会话移除解码输出属性 private void disposeDecoderOut(IoSession session) { session.removeAttribute(DECODER_OUT); }
从上面来看会话关闭,主要是解码器解码会话未解码数据,写到解码器输出消息队列,
最后释放编码器,解码器及解码器输出相关资源,flush解码器输出消息队列。
我们来看移除协议编解码过滤器onPostRemove
public void onPostRemove(IoFilterChain parent, String name, NextFilter nextFilter) throws Exception { disposeEncoder(parent.getSession());//从会话移除编码器属性,释放会话编码器相关资源 disposeDecoder(parent.getSession());//从会话移除解码器属性,释放会话解码器资源 disposeDecoderOut(parent.getSession());//从会话移除解码输出属性 }
从上面可以看出,从过滤链移除协议编解码过滤器后,要释放会话编解码器,
及解码输出属性,并释放相关的资源。
总结:
协议编解码过滤器构造,主要是初始化协议编码器工厂。一个过滤链上不能存在两个协议编解码器,即唯一。会话写操作来看session#write(filterWrite),首先从写请求获取消息,如果消息为字节buffer,则直接传给后继过滤器,否则从协议编解码工厂获取协议编码器和协议编码器输出,协议编码器encode编码消息,写到协议编码器输出字节buffer队列,
然后协议编码器输出flush字节buffer队列。会话接收消息messageReceived,如果消息非字节buffer,则直接传给后继过滤器,否则获取协议解码器,及协议解码器输出,协议解码器解码字节buffer为上传消息对象,写到协议解码器输出消息队列,最后解码器输出flush消息队列。会话关闭,主要是解码器解码会话未解码数据,写到解码器输出消息队列,最后从会话移除编码器,解码器及解码器输出属性,释放编码器,解码器及解码器输出相关资源,flush解码器输出消息队列。协议编解码器过滤器从过滤链移除后,要释放会话编解码器,及解码输出属性,并释放相关的资源。