欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Mina 协议编解码过滤器三(会话write与消息接收过滤)

程序员文章站 2024-01-12 21:58:28
...
Mina 协议编解码过滤器一(协议编解码工厂、协议编码器):
http://donald-draper.iteye.com/blog/2376663
Mina 协议编解码过滤器二(协议解码器):
http://donald-draper.iteye.com/blog/2376679
引言:
    前面我们看了协议编解码过滤器的所涉及到的相关概念,先回顾一下:
协议编解码过滤器关联一个协议编解码工厂,协议编解码工厂用于创建协议编码和解码器;协议编码器将上层消息,编码成二级制或特定协议格式的数据,写到协议编码器输出的字节队列中,flush字节队列中的数据(filterWrite)给下一个过滤器。协议解码器将接收到的二级制或特定协议格式的数据,解码成上层消息,存放到协议解码器输出的消息队列,flush将消息队列中的消息传给后继过滤器的messageReceived方法。协议编解码过滤器ProtocolCodecFilter默认的协议编码输出为ProtocolEncoderOutputImpl,协议解码输出为SimpleProtocolDecoderOutput。
结构如下:
ProtocolCodecFilter extends IoFilterAdapter
   --ProtocolCodecFactory
      --ProtocolEncoder
         --ProtocolEncoderOutput(ProtocolEncoderOutputImpl)
      --ProtocolDecoder
         --ProtocolDecoderOutput(SimpleProtocolDecoderOutput)

今天我们正式进入来分析协议编解码过滤器的实现。
//ProtocolCodecFilter
/**
 * An {@link IoFilter} which translates binary or protocol specific data into
 * message object and vice versa using {@link ProtocolCodecFactory},
 * {@link ProtocolEncoder}, or {@link ProtocolDecoder}.
 *
 * @author The Apache Directory Project (mina-dev@directory.apache.org)
 * @version $Rev$, $Date$
 */
public class ProtocolCodecFilter extends IoFilterAdapter
{
    private static final Logger LOGGER = LoggerFactory.getLogger(org/apache/mina/filter/codec/ProtocolCodecFilter);
    private static final Class EMPTY_PARAMS[] = new Class[0];
    private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]);
    //编码器属性key
    private static final AttributeKey ENCODER = 
         new AttributeKey(org/apache/mina/filter/codec/ProtocolCodecFilter, "encoder");
    //编码器输出属性key
    private static final AttributeKey ENCODER_OUT = 
         new AttributeKey(org/apache/mina/filter/codec/ProtocolCodecFilter, "encoderOut");
    //解码器属性key
    private static final AttributeKey DECODER = 
         new AttributeKey(org/apache/mina/filter/codec/ProtocolCodecFilter, "decoder");  
    //解码器输出属性key
    private static final AttributeKey DECODER_OUT = 
         new AttributeKey(org/apache/mina/filter/codec/ProtocolCodecFilter, "decoderOut");
    private final ProtocolCodecFactory factory;//协议编解码器工厂
}

先来看构造:
//根据协议编解码器构造协议编解码过滤器
 public ProtocolCodecFilter(ProtocolCodecFactory factory) {
        if (factory == null) {
            throw new NullPointerException("factory");
        }
        this.factory = factory;
    }
//根据协议编码器和解码器构造协议编解码过滤器
    public ProtocolCodecFilter(final ProtocolEncoder encoder,
            final ProtocolDecoder decoder) {
        if (encoder == null) {
            throw new NullPointerException("encoder");
        }
        if (decoder == null) {
            throw new NullPointerException("decoder");
        }
        factory = new ProtocolCodecFactory() {
            public ProtocolEncoder getEncoder() {
                return encoder;
            }
            public ProtocolDecoder getDecoder() {
                return decoder;
            }
        };
    }
//根据协议编解码类构造协议编解码过滤器
public ProtocolCodecFilter(final Class encoderClass,
            final Class decoderClass) {
        if (encoderClass == null) {
            throw new NullPointerException("encoderClass");
        }
        if (decoderClass == null) {
            throw new NullPointerException("decoderClass");
        }
	//如果协议编解码类型参数非ProtocolEncoder,ProtocolDecoder
	//抛出非法参数异常
        if (!ProtocolEncoder.class.isAssignableFrom(encoderClass)) {
            throw new IllegalArgumentException("encoderClass: "
                    + encoderClass.getName());
        }
        if (!ProtocolDecoder.class.isAssignableFrom(decoderClass)) {
            throw new IllegalArgumentException("decoderClass: "
                    + decoderClass.getName());
        }
	//获取协议编解码器无参构造
        try {
            encoderClass.getConstructor(EMPTY_PARAMS);
        } catch (NoSuchMethodException e) {
            throw new IllegalArgumentException(
                    "encoderClass doesn't have a public default constructor.");
        }
        try {
            decoderClass.getConstructor(EMPTY_PARAMS);
        } catch (NoSuchMethodException e) {
            throw new IllegalArgumentException(
                    "decoderClass doesn't have a public default constructor.");
        }
       //根据协议编码器和解码器类型创建协议编解码实例,构造协议编解码工厂
        factory = new ProtocolCodecFactory() {
            public ProtocolEncoder getEncoder() throws Exception {
                return (ProtocolEncoder) encoderClass.newInstance();
            }

            public ProtocolDecoder getDecoder() throws Exception {
                return (ProtocolDecoder) decoderClass.newInstance();
            }
        };
}

从上面可以看出,协议编解码过滤器构造,说到底就是初始化协议编码器工厂。
再来看过滤器的相关方法
 public void onPreAdd(IoFilterChain parent, String name,
            NextFilter nextFilter) throws Exception {
	    //如果过滤器链已经包含协议编解码过滤器,则抛出非法状态异常
        if (parent.contains(ProtocolCodecFilter.class)) {
            throw new IllegalStateException(
                    "A filter chain cannot contain more than one ProtocolCodecFilter.");
        }
    }

从onPreAdd方法来看,一个过滤链上不能存在两个协议编解码器,即唯一。

再来看会话发送消息

public void filterWrite(NextFilter nextFilter, IoSession session,
            WriteRequest writeRequest) throws Exception {
	//从写请求获取消息
        Object message = writeRequest.getMessage();
	//如果为字节buffer,传给后继过滤器
        if (message instanceof ByteBuffer) {
            nextFilter.filterWrite(session, writeRequest);
            return;
        }
        //从会话获取协议编码器,及协议编码输出
        ProtocolEncoder encoder = getEncoder(session);
        ProtocolEncoderOutputImpl encoderOut = getEncoderOut(session,
                nextFilter, writeRequest);

        try {
	    //编码器编码消息
            encoder.encode(session, message, encoderOut);
	    //编码输出flush消息队列
            encoderOut.flush();
	    //传递消息到下一个过滤
            nextFilter.filterWrite(session, new WriteRequest(
                    new MessageByteBuffer(writeRequest.getMessage()),
                    writeRequest.getFuture(), writeRequest.getDestination()));
        } catch (Throwable t) {
            ProtocolEncoderException pee;
            if (t instanceof ProtocolEncoderException) {
                pee = (ProtocolEncoderException) t;
            } else {
                pee = new ProtocolEncoderException(t);
            }
            throw pee;
        }
    }

来看这几点
1.
 //从会话获取协议编码器,及协议编码输出
  ProtocolEncoder encoder = getEncoder(session);
  ProtocolEncoderOutputImpl encoderOut = getEncoderOut(session,
          nextFilter, writeRequest);

//获取协议编码器,这个很简单不讲了
private ProtocolEncoder getEncoder(IoSession session) throws Exception {
        ProtocolEncoder encoder = (ProtocolEncoder) session
                .getAttribute(ENCODER);
        if (encoder == null) {
            encoder = factory.getEncoder();
            session.setAttribute(ENCODER, encoder);
        }
        return encoder;
    }
    //获取协议编码器输出
    private ProtocolEncoderOutputImpl getEncoderOut(IoSession session,
            NextFilter nextFilter, WriteRequest writeRequest) {
        return new ProtocolEncoderOutputImpl(session, nextFilter, writeRequest);
    }

2.
//传递消息到下一个过滤
nextFilter.filterWrite(session, new WriteRequest(
        new MessageByteBuffer(writeRequest.getMessage()),
        writeRequest.getFuture(), writeRequest.getDestination()));

//继承字节buffer代理,这个在前面已看过,不在讲
private static class MessageByteBuffer extends ByteBufferProxy {
    private final Object message;

    private MessageByteBuffer(Object message) {
        super(EMPTY_BUFFER);
        this.message = message;
    }

    public void acquire() {
        // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message
    }

    public void release() {
        // no-op since we are wraping a zero-byte buffer, this instance is to just curry the message
    }
}

从会话写操作来看session#write(filterWrite),首先从写请求获取消息,如果消息为字节buffer,则直接传给后继过滤器,否则从协议编解码工厂获取协议编码器和协议编码器输出,
协议编码器encode编码消息,写到协议编码器输出字节buffer队列,
然后协议编码器输出flush字节buffer队列。
再来看messageSent,已一看就明白:
public void messageSent(NextFilter nextFilter, IoSession session,
            Object message) throws Exception {
        if (message instanceof HiddenByteBuffer) {
            return;
        }
        if (!(message instanceof MessageByteBuffer)) {
            nextFilter.messageSent(session, message);
            return;
        }
        nextFilter.messageSent(session, ((MessageByteBuffer) message).message);
    }

再来看接收消息
public void messageReceived(NextFilter nextFilter, IoSession session,
            Object message) throws Exception {
	//如果消息非字节buffer,则直接传给后继过滤器
        if (!(message instanceof ByteBuffer)) {
            nextFilter.messageReceived(session, message);
            return;
        }
        ByteBuffer in = (ByteBuffer) message;
	//如果字节buffer为空,直接返回
        if (!in.hasRemaining()) {
            in.release();
            return;
        }
        //获取解码器,及解码器输出
        ProtocolDecoder decoder = getDecoder(session);
        ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
        int oldPos = in.position();
        try {
            synchronized (decoderOut) {
	        //解码字节buffer
                decoder.decode(session, in, decoderOut);
            }
        } catch (Throwable t) {
            ProtocolDecoderException pde;
            if (t instanceof ProtocolDecoderException) {
                pde = (ProtocolDecoderException) t;
            } else {
                pde = new ProtocolDecoderException(t);
            }

            if (pde.getHexdump() == null) {
                int curPos = in.position();
                in.position(oldPos);
                pde.setHexdump(in.getHexDump());
                in.position(curPos);
            }
            throw pde;
        } finally {
            try {
                // Release the read buffer.
		//释放字节buffer
                in.release();
            } finally {
	        //flush解码器输出消息队列
                decoderOut.flush();
            }
        }
    }

messageReceived方法有1点要关注,
//获取解码器,及解码器输出
ProtocolDecoder decoder = getDecoder(session);
ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);

//这个与协议编码器和编码器输出相似
  private ProtocolDecoder getDecoder(IoSession session) throws Exception {
        ProtocolDecoder decoder = (ProtocolDecoder) session
                .getAttribute(DECODER);
        if (decoder == null) {
            decoder = factory.getDecoder();
            session.setAttribute(DECODER, decoder);
        }
        return decoder;
    }

    private ProtocolDecoderOutput getDecoderOut(IoSession session,
            NextFilter nextFilter) {
        ProtocolDecoderOutput out = (ProtocolDecoderOutput) session.getAttribute(DECODER_OUT);
        if (out == null) {
            out = new SimpleProtocolDecoderOutput(session, nextFilter);
            session.setAttribute(DECODER_OUT, out);
        }

        return out;
    }
 

从上面可以看出,会话接收消息messageReceived,如果消息非字节buffer,则直接传给
后继过滤器,否则获取协议解码器,及协议解码器输出,协议解码器解码字节buffer为上传消息对象,写到协议解码器输出消息队列,最后解码器输出flush消息队列。
再来看会话关闭
 public void sessionClosed(NextFilter nextFilter, IoSession session)
            throws Exception {
        // Call finishDecode() first when a connection is closed.
	//获取解码器及解码器输出
        ProtocolDecoder decoder = getDecoder(session);
        ProtocolDecoderOutput decoderOut = getDecoderOut(session, nextFilter);
        try {
	    //解码器,解码会话未解码数据
            decoder.finishDecode(session, decoderOut);
        } catch (Throwable t) {
            ProtocolDecoderException pde;
            if (t instanceof ProtocolDecoderException) {
                pde = (ProtocolDecoderException) t;
            } else {
                pde = new ProtocolDecoderException(t);
            }
            throw pde;
        } finally {
            // Dispose all.
	    //释放编码器,解码器及解码器输出相关资源,flush解码器输出消息队列
            disposeEncoder(session);
            disposeDecoder(session);
            disposeDecoderOut(session);
            decoderOut.flush();
        }
        nextFilter.sessionClosed(session);
}

来看方法的最后资源释放:
//释放解码器资源
private void disposeEncoder(IoSession session) {
        //移除会将编码器属性
        ProtocolEncoder encoder = (ProtocolEncoder) session
                .removeAttribute(ENCODER);
        if (encoder == null) {
            return;
        }

        try {
	   //释放会话编码器相关资源
            encoder.dispose(session);
        } catch (Throwable t) {
            SessionLog.warn(session, "Failed to dispose: "
                    + encoder.getClass().getName() + " (" + encoder + ')');
        }
    }
   //从会话移除解码器属性,释放会话解码器资源
    private void disposeDecoder(IoSession session) {
        ProtocolDecoder decoder = (ProtocolDecoder) session
                .removeAttribute(DECODER);
        if (decoder == null) {
            return;
        }

        try {
            decoder.dispose(session);
        } catch (Throwable t) {
            SessionLog.warn(session, "Falied to dispose: "
                    + decoder.getClass().getName() + " (" + decoder + ')');
        }
    }
//从会话移除解码输出属性
 private void disposeDecoderOut(IoSession session) {
        session.removeAttribute(DECODER_OUT);
    }

从上面来看会话关闭,主要是解码器解码会话未解码数据,写到解码器输出消息队列,
最后释放编码器,解码器及解码器输出相关资源,flush解码器输出消息队列。

我们来看移除协议编解码过滤器onPostRemove
public void onPostRemove(IoFilterChain parent, String name,
            NextFilter nextFilter) throws Exception {
        disposeEncoder(parent.getSession());//从会话移除编码器属性,释放会话编码器相关资源
        disposeDecoder(parent.getSession());//从会话移除解码器属性,释放会话解码器资源
        disposeDecoderOut(parent.getSession());//从会话移除解码输出属性
    }

从上面可以看出,从过滤链移除协议编解码过滤器后,要释放会话编解码器,
及解码输出属性,并释放相关的资源。

总结:
协议编解码过滤器构造,主要是初始化协议编码器工厂。一个过滤链上不能存在两个协议编解码器,即唯一。会话写操作来看session#write(filterWrite),首先从写请求获取消息,如果消息为字节buffer,则直接传给后继过滤器,否则从协议编解码工厂获取协议编码器和协议编码器输出,协议编码器encode编码消息,写到协议编码器输出字节buffer队列,
然后协议编码器输出flush字节buffer队列。会话接收消息messageReceived,如果消息非字节buffer,则直接传给后继过滤器,否则获取协议解码器,及协议解码器输出,协议解码器解码字节buffer为上传消息对象,写到协议解码器输出消息队列,最后解码器输出flush消息队列。会话关闭,主要是解码器解码会话未解码数据,写到解码器输出消息队列,最后从会话移除编码器,解码器及解码器输出属性,释放编码器,解码器及解码器输出相关资源,flush解码器输出消息队列。协议编解码器过滤器从过滤链移除后,要释放会话编解码器,及解码输出属性,并释放相关的资源。


相关标签: mina