Mina 协议编解码过滤器一(协议编解码工厂、协议编码器)
程序员文章站
2024-01-13 13:05:28
...
MINA TCP简单通信实例:http://donald-draper.iteye.com/blog/2375297
MINA 编解码器实例:http://donald-draper.iteye.com/blog/2375317
MINA 多路分离解码器实例:http://donald-draper.iteye.com/blog/2375324
Mina Socket会话配置:http://donald-draper.iteye.com/blog/2375529
Mina 过滤链默认构建器:http://donald-draper.iteye.com/blog/2375985
Mina 过滤器定义:http://donald-draper.iteye.com/blog/2376161
Mina 日志过滤器与引用计数过滤器:http://donald-draper.iteye.com/blog/2376226
Mina 过滤链抽象实现:http://donald-draper.iteye.com/blog/2376335
Mina Socket与报文过滤链:http://donald-draper.iteye.com/blog/2376440
引言:
上面几篇文章我们简单看了一下Socket会话配置,过滤器及过滤链;在TCP简单通信实例这篇文章中,有这么一段代码:
//配置过滤器
前面我们看过日志过滤器,今天我们来看一下协议编解码过滤器ProtocolCodecFilter。
//属性key
在往下看之前,我们先来看一下协议编解码工厂ProtocolCodecFactory
从上可以看出,协议编解码工厂ProtocolCodecFactory提供协议编码器ProtocolEncoder和解码器ProtocolDecoder;编码器将消息对象编码成二进制或协议数据,解码器将二进制或协议数据解码成消息对象。
再来看编码器与解码器的定义,先来看编码器:
//ProtocolEncoderAdapter
再来看协议编码输出ProtocolEncoderOutput
来看一下ProtocolEncoderOutput的简单实现
从上面可以看出编码器ProtocolEncoder主要有两个方法,encode和dispose;
encode用于,编码上层的消息对象为二进制或协议数据。mina调用编码器的#encode
方法将从会话写请求队列中pop的消息,然后调用ProtocolEncoderOutput#write方法将编码后的消息放在ByteBuffer;dispose方法释放编码器资源。ProtocolEncoderAdapter为编码器抽象实现,默认实现了dispose,不做任何事情,对于不需要释放资源的编码器继承ProtocolEncoderAdapter。ProtocolEncoderOutput主要的工作是将协议编码器编码后的
字节buffer,缓存起来,等待flush方法调用时,则将数据发送出去。SimpleProtocolEncoderOutput为ProtocolEncoderOutput的简单实现内部有一个buffer队列bufferQueue(Queue),用于存放write(ByteBuffer)方法,传入的字节buffer;mergeAll方法为合并buffer队列的所有buffer数据到一个buffer;flush方法为发送buffer队列中的所有buffer,实际发送工作委托给doFlush方法待子类实现。
来看协议编解码过滤器ProtocolCodecFilter的ProtocolEncoderOutput的内部实现:
ProtocolEncoderOutputImpl的doFlush,首先将会话包装成DefaultWriteFuture,
将会话,写请求信息传递给NextFilter
//ProtocolCodecFilter
这一篇,我们先看到这,自此我们了协议编解码过滤器的内部协议编解码工厂ProtocolCodecFactory,及内部的ProtocolEncoderOutputImpl,先来总结一下,至于协议编解码工厂在协议编解码过滤器ProtocolCodecFilter中的作用我们在后面的文章再说。
总结:
协议编解码过滤器ProtocolCodecFilter关联一个协议编解码工厂ProtocolCodecFactory,协议编解码工厂提供协议编码器ProtocolEncoder和解码器ProtocolDecoder;编码器将消息对象编码成二进制或协议数据,解码器将二进制或协议数据解码成消息对象。编码器ProtocolEncoder主要有两个方法,encode和dispose;encode用于,编码上层的消息对象为二进制或协议数据。mina调用编码器的#encode方法将从会话写请求队列中pop的消息,然后调用ProtocolEncoderOutput#write方法将编码后的消息放在ByteBuffer;dispose方法释放编码器资源。ProtocolEncoderAdapter为编码器抽象实现,默认实现了dispose,不做任何事情,对于不需要释放资源的编码器继承ProtocolEncoderAdapter。ProtocolEncoderOutput主要的工作是将协议编码器编码后的字节buffer,缓存起来,等待flush方法调用时,则将数据发送出去。SimpleProtocolEncoderOutput为ProtocolEncoderOutput的简单实现内部有一个buffer队列bufferQueue(Queue),用于存放write(ByteBuffer)方法,传入的字节buffer;
mergeAll方法为合并buffer队列的所有buffer数据到一个buffer;flush方法为发送buffer队列中的所有buffer,实际发送工作委托给doFlush方法待子类实现。ProtocolEncoderOutputImpl为协议编解码过滤器的内部类,ProtocolEncoderOutputImpl的doFlush,首先将会话包装成DefaultWriteFuture,将会话,写请求信息传递给NextFilter。
附:
DefaultIoFuture:
MINA 编解码器实例:http://donald-draper.iteye.com/blog/2375317
MINA 多路分离解码器实例:http://donald-draper.iteye.com/blog/2375324
Mina Socket会话配置:http://donald-draper.iteye.com/blog/2375529
Mina 过滤链默认构建器:http://donald-draper.iteye.com/blog/2375985
Mina 过滤器定义:http://donald-draper.iteye.com/blog/2376161
Mina 日志过滤器与引用计数过滤器:http://donald-draper.iteye.com/blog/2376226
Mina 过滤链抽象实现:http://donald-draper.iteye.com/blog/2376335
Mina Socket与报文过滤链:http://donald-draper.iteye.com/blog/2376440
引言:
上面几篇文章我们简单看了一下Socket会话配置,过滤器及过滤链;在TCP简单通信实例这篇文章中,有这么一段代码:
//配置过滤器
DefaultIoFilterChainBuilder defaultIoFilterChainBuilder = acceptor.getFilterChain(); LoggingFilter loggingFilter = new LoggingFilter(); defaultIoFilterChainBuilder.addLast("loggingFilter", loggingFilter); TextLineCodecFactory textLineCodecFactory = new TextLineCodecFactory(charset,LineDelimiter.WINDOWS.getValue(), LineDelimiter.WINDOWS.getValue()); ProtocolCodecFilter protocolCodecFilter = new ProtocolCodecFilter(textLineCodecFactory); defaultIoFilterChainBuilder.addLast("protocolCodecFilter",protocolCodecFilter);
前面我们看过日志过滤器,今天我们来看一下协议编解码过滤器ProtocolCodecFilter。
/** * An {@link IoFilter} which translates binary or protocol specific data into * message object and vice versa using {@link ProtocolCodecFactory}, * {@link ProtocolEncoder}, or {@link ProtocolDecoder}. * * @author The Apache Directory Project (mina-dev@directory.apache.org) * @version $Rev$, $Date$ */ public class ProtocolCodecFilter extends IoFilterAdapter { private static final Logger LOGGER = LoggerFactory.getLogger(org/apache/mina/filter/codec/ProtocolCodecFilter); private static final Class EMPTY_PARAMS[] = new Class[0]; private static final IoBuffer EMPTY_BUFFER = IoBuffer.wrap(new byte[0]); //编码器属性key private static final AttributeKey ENCODER = new AttributeKey(org/apache/mina/filter/codec/ProtocolCodecFilter, "encoder"); //编码器输出属性key private static final AttributeKey ENCODER_OUT = new AttributeKey(org/apache/mina/filter/codec/ProtocolCodecFilter, "encoderOut"); //解码器属性key private static final AttributeKey DECODER = new AttributeKey(org/apache/mina/filter/codec/ProtocolCodecFilter, "decoder"); //解码器输出属性key private static final AttributeKey DECODER_OUT = new AttributeKey(org/apache/mina/filter/codec/ProtocolCodecFilter, "decoderOut"); private final ProtocolCodecFactory factory;//编解码器工厂协议 }
//属性key
public final class AttributeKey implements Serializable { private static final long serialVersionUID = -583377473376683096L; private final String name; public AttributeKey(Class source, String name) { this.name = (new StringBuilder()).append(source.getName()).append('.').append(name).append('@').append(Integer.toHexString(hashCode())).toString(); } public String toString() { return name; } public int hashCode() { int h = 629 + (name != null ? name.hashCode() : 0); return h; } public boolean equals(Object obj) { if(this == obj) return true; if(!(obj instanceof AttributeKey)) { return false; } else { AttributeKey other = (AttributeKey)obj; return name.equals(other.name); } } }
在往下看之前,我们先来看一下协议编解码工厂ProtocolCodecFactory
/** * Provides {@link ProtocolEncoder} and {@link ProtocolDecoder} which translates * binary or protocol specific data into message object and vice versa. * <p>协议编解码工厂ProtocolCodecFactory提供协议编码器和解码器,解码器二进制数据或 协议数据到消息对象;编码器反之。 * Please refer to * [url=../../../../../xref-examples/org/apache/mina/examples/reverser/ReverseProtocolProvider.html]<code>ReverserProtocolProvider</code>[/url] * example. * * @author The Apache Directory Project (mina-dev@directory.apache.org) * @version $Rev$, $Date$ */ public interface ProtocolCodecFactory { /** * Returns a new (or reusable) instance of {@link ProtocolEncoder} which * encodes message objects into binary or protocol-specific data. 返回一个编码器实例,用于将消息对象编码成二进制或协议数据 */ ProtocolEncoder getEncoder() throws Exception; /** * Returns a new (or reusable) instance of {@link ProtocolDecoder} which * decodes binary or protocol-specific data into message objects. 返回一个解码器实例,用于将二进制或协议数据解码成消息对象 */ ProtocolDecoder getDecoder() throws Exception; }
从上可以看出,协议编解码工厂ProtocolCodecFactory提供协议编码器ProtocolEncoder和解码器ProtocolDecoder;编码器将消息对象编码成二进制或协议数据,解码器将二进制或协议数据解码成消息对象。
再来看编码器与解码器的定义,先来看编码器:
/** * Encodes higher-level message objects into binary or protocol-specific data. * MINA invokes {@link #encode(IoSession, Object, ProtocolEncoderOutput)} * method with message which is popped from the session write queue, and then * the encoder implementation puts encoded {@link ByteBuffer}s into * {@link ProtocolEncoderOutput} by calling * {@link ProtocolEncoderOutput#write(ByteBuffer)}. * <p>协议编码器用于编码上层的消息对象为二进制或协议数据。mina调用编码器的#encode 方法将从会话写请求队列中pop的消息,然后调用ProtocolEncoderOutput#write 方法将编码后的消息放在ByteBuffer。 * Please refer to * [url=../../../../../xref-examples/org/apache/mina/examples/reverser/TextLineEncoder.html]<code>TextLineEncoder</code>[/url] * example. * * @author The Apache Directory Project (mina-dev@directory.apache.org) * @version $Rev$, $Date$ */ public interface ProtocolEncoder { /** * Encodes higher-level message objects into binary or protocol-specific data. * MINA invokes {@link #encode(IoSession, Object, ProtocolEncoderOutput)} * method with message which is popped from the session write queue, and then * the encoder implementation puts encoded {@link ByteBuffer}s into * {@link ProtocolEncoderOutput}. * 协议编码器用于编码上层的消息对象为二进制或协议数据。mina调用编码器的#encode 方法将从会话写请求队列中pop的消息,然后调用ProtocolEncoderOutput#write 方法将编码后的消息放在ByteBuffer。 * @throws Exception if the message violated protocol specification */ void encode(IoSession session, Object message, ProtocolEncoderOutput out) throws Exception; /** * Releases all resources related with this encoder. * 释放编码器关联的所有资源 * @throws Exception if failed to dispose all resources */ void dispose(IoSession session) throws Exception; }
//ProtocolEncoderAdapter
/** * An abstract {@link ProtocolEncoder} implementation for those who don't have any * resources to dispose. * ProtocolEncoderAdapter为协议编码的抽象实现,主要是对应没有任何资源要释放的协议编码。 * @author The Apache Directory Project (mina-dev@directory.apache.org) * @version $Rev$, $Date$ */ public abstract class ProtocolEncoderAdapter implements ProtocolEncoder { /** * Override this method dispose all resources related with this encoder. * The default implementation does nothing. 重写dispose,释放相关资源,默认为does nothing */ public void dispose(IoSession session) throws Exception { } }
再来看协议编码输出ProtocolEncoderOutput
/** * Callback for {@link ProtocolEncoder} to generate encoded {@link ByteBuffer}s. * {@link ProtocolEncoder} must call {@link #write(ByteBuffer)} for each encoded * message. * * @author The Apache Directory Project (mina-dev@directory.apache.org) * @version $Rev$, $Date$ */ public interface ProtocolEncoderOutput { /** * Callback for {@link ProtocolEncoder} to generate encoded * {@link ByteBuffer}s. {@link ProtocolEncoder} must call * {@link #write(ByteBuffer)} for each encoded message. * 协议编码器回调此方法,编码消息到ByteBuffer * @param buf the buffer which contains encoded data */ void write(ByteBuffer buf); /** * Merges all buffers you wrote via {@link #write(ByteBuffer)} into * one {@link ByteBuffer} and replaces the old fragmented ones with it. * This method is useful when you want to control the way MINA generates * network packets. 合并所有通过#write(ByteBuffer)产生的字节buffer,replaces the old fragmented ones with it 当你需要控制mina产生网络包的时候,此方非常有用。 */ void mergeAll(); /** * Flushes all buffers you wrote via {@link #write(ByteBuffer)} to * the session. This operation is asynchronous; please wait for * the returned {@link WriteFuture} if you want to wait for * the buffers flushed. * 刷新所有通过write(ByteBuffer)写到会话的字节buffer。此操作为异步; 如果想等待buffer刷新完成,可以等待返回的结果WriteFuture * @return <tt>null</tt> if there is nothing to flush at all. */ WriteFuture flush(); }
来看一下ProtocolEncoderOutput的简单实现
/** * A {@link ProtocolEncoderOutput} based on queue. * * @author The Apache Directory Project (mina-dev@directory.apache.org) * @version $Rev$, $Date$ */ public abstract class SimpleProtocolEncoderOutput implements ProtocolEncoderOutput { private final Queue bufferQueue = new Queue();//buffer队列 public SimpleProtocolEncoderOutput() { } public Queue getBufferQueue() { return bufferQueue; } //将buffer添加到buffer队列 public void write(ByteBuffer buf) { bufferQueue.push(buf); } //合并 public void mergeAll() { int sum = 0; final int size = bufferQueue.size(); if (size < 2) { // no need to merge!,长度小2,不许要合并 return; } // Get the size of merged BB,计算buffer队列中,所有buffer的实际数据容量 for (int i = size - 1; i >= 0; i--) { sum += ((ByteBuffer) bufferQueue.get(i)).remaining(); } // Allocate a new BB that will contain all fragments,创建一个容量为sum的字节buffer ByteBuffer newBuf = ByteBuffer.allocate(sum); // and merge all. //遍历buffer队列所有buffer,放在newBuf中,并释放原始buffer for (;;) { ByteBuffer buf = (ByteBuffer) bufferQueue.pop(); if (buf == null) { break; } newBuf.put(buf); buf.release(); } // Push the new buffer finally.读写模式切换 newBuf.flip(); bufferQueue.push(newBuf);//将合并后的buffer添加到buffer队列 } //刷新buffer队列数据 public WriteFuture flush() { Queue bufferQueue = this.bufferQueue; WriteFuture future = null; if (bufferQueue.isEmpty()) { return null; } else { 遍历buffer队列所有buffer,发送buffer数据 for (;;) { ByteBuffer buf = (ByteBuffer) bufferQueue.pop(); if (buf == null) { break; } // Flush only when the buffer has remaining. if (buf.hasRemaining()) { //委托给doFlush future = doFlush(buf); } } } return future; } //待子类扩展 protected abstract WriteFuture doFlush(ByteBuffer buf); }
从上面可以看出编码器ProtocolEncoder主要有两个方法,encode和dispose;
encode用于,编码上层的消息对象为二进制或协议数据。mina调用编码器的#encode
方法将从会话写请求队列中pop的消息,然后调用ProtocolEncoderOutput#write方法将编码后的消息放在ByteBuffer;dispose方法释放编码器资源。ProtocolEncoderAdapter为编码器抽象实现,默认实现了dispose,不做任何事情,对于不需要释放资源的编码器继承ProtocolEncoderAdapter。ProtocolEncoderOutput主要的工作是将协议编码器编码后的
字节buffer,缓存起来,等待flush方法调用时,则将数据发送出去。SimpleProtocolEncoderOutput为ProtocolEncoderOutput的简单实现内部有一个buffer队列bufferQueue(Queue),用于存放write(ByteBuffer)方法,传入的字节buffer;mergeAll方法为合并buffer队列的所有buffer数据到一个buffer;flush方法为发送buffer队列中的所有buffer,实际发送工作委托给doFlush方法待子类实现。
来看协议编解码过滤器ProtocolCodecFilter的ProtocolEncoderOutput的内部实现:
private static class ProtocolEncoderOutputImpl extends SimpleProtocolEncoderOutput { private final IoSession session; private final NextFilter nextFilter; private final WriteRequest writeRequest; public ProtocolEncoderOutputImpl(IoSession session, NextFilter nextFilter, WriteRequest writeRequest) { this.session = session; this.nextFilter = nextFilter; this.writeRequest = writeRequest; } //doFlush protected WriteFuture doFlush(ByteBuffer buf) { //创建写操作返回结果 WriteFuture future = new DefaultWriteFuture(session); //将会话写事件filterWrite传递给下一个过滤器 nextFilter.filterWrite(session, new WriteRequest( new HiddenByteBuffer(buf), future, writeRequest .getDestination())); return future; } }
ProtocolEncoderOutputImpl的doFlush,首先将会话包装成DefaultWriteFuture,
将会话,写请求信息传递给NextFilter
//ProtocolCodecFilter
private static class HiddenByteBuffer extends ByteBufferProxy { private HiddenByteBuffer(ByteBuffer buf) { super(buf); } }
/** * A {@link ByteBuffer} that wraps a buffer and proxies any operations to it. * <p>ByteBufferProxy可以理解为字节buffer的静态代理,所有的方法都是委托给内部的字节buf。 * You can think this class like a {@link FilterOutputStream}. All operations * are proxied by default so that you can extend this class and override existing * operations selectively. You can introduce new operations, too. * 这个有点像FilterOutputStream,所有的代理操作默认都是通过内部buf完成, 亦可以选择重新一些方法 * @author The Apache Directory Project (mina-dev@directory.apache.org) * @version $Rev$, $Date$ */ public class ByteBufferProxy extends ByteBuffer { /** * The buffer proxied by this proxy. */ protected ByteBuffer buf; /** * Create a new instance. * @param buf the buffer to be proxied */ protected ByteBufferProxy(ByteBuffer buf) { if (buf == null) { throw new NullPointerException("buf"); } this.buf = buf; } public void acquire() { buf.acquire(); } public void release() { buf.release(); } public ByteBuffer flip() { buf.flip(); return this; } ... 从上面看所有方法都是委托给内部buf }
这一篇,我们先看到这,自此我们了协议编解码过滤器的内部协议编解码工厂ProtocolCodecFactory,及内部的ProtocolEncoderOutputImpl,先来总结一下,至于协议编解码工厂在协议编解码过滤器ProtocolCodecFilter中的作用我们在后面的文章再说。
总结:
协议编解码过滤器ProtocolCodecFilter关联一个协议编解码工厂ProtocolCodecFactory,协议编解码工厂提供协议编码器ProtocolEncoder和解码器ProtocolDecoder;编码器将消息对象编码成二进制或协议数据,解码器将二进制或协议数据解码成消息对象。编码器ProtocolEncoder主要有两个方法,encode和dispose;encode用于,编码上层的消息对象为二进制或协议数据。mina调用编码器的#encode方法将从会话写请求队列中pop的消息,然后调用ProtocolEncoderOutput#write方法将编码后的消息放在ByteBuffer;dispose方法释放编码器资源。ProtocolEncoderAdapter为编码器抽象实现,默认实现了dispose,不做任何事情,对于不需要释放资源的编码器继承ProtocolEncoderAdapter。ProtocolEncoderOutput主要的工作是将协议编码器编码后的字节buffer,缓存起来,等待flush方法调用时,则将数据发送出去。SimpleProtocolEncoderOutput为ProtocolEncoderOutput的简单实现内部有一个buffer队列bufferQueue(Queue),用于存放write(ByteBuffer)方法,传入的字节buffer;
mergeAll方法为合并buffer队列的所有buffer数据到一个buffer;flush方法为发送buffer队列中的所有buffer,实际发送工作委托给doFlush方法待子类实现。ProtocolEncoderOutputImpl为协议编解码过滤器的内部类,ProtocolEncoderOutputImpl的doFlush,首先将会话包装成DefaultWriteFuture,将会话,写请求信息传递给NextFilter。
附:
/** * A default implementation of {@link WriteFuture}. * * @author The Apache Directory Project (mina-dev@directory.apache.org) * @version $Rev$, $Date$ */ public class DefaultWriteFuture extends DefaultIoFuture implements WriteFuture
DefaultIoFuture: