关于Netty的ReplayingDecoder
Netty用户手册介绍Decoder的章节重点介绍了ByteToMessageDecoder,顺道提到了一个陌生的ReplayingDecoder,学习并记录之。下文的例子均来自于ReplayingDecoder源码注释。
先看ReplayingDecoder的类定义:
public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {}
可见其是ByteToMessageDecoder的子类。类定义中的泛型 S 是一个用于记录解码状态的状态机枚举类,在state(S s)、checkpoint(S s)等方法中会用到。在简单解码时也可以用java.lang.Void来占位。
与ByteToMessageDecoder不同,该类可以在接收到所需要长度的字节之后再调用decode方法,而不用一遍又一遍的手动检查流中的字节长度。下面给出官方的例子:
首先是作对比的ByteToMessageDecoder风格实现,也是我之前的习惯做法 :-(
public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder{
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
if (buf.readableBytes() < 4) {
return;
}
buf.markReaderIndex();
int length = buf.readInt();
if (buf.readableBytes() < length) {
buf.resetReaderIndex();
return;
}
out.add(buf.readBytes(length));
}
}
可见上面解码时略微繁琐,必须两次判断buf可读字节长度,并手动恢复readerIndex。下面来看简洁的ReplayingDecoder实现:
public class IntegerHeaderFrameDecoder extends ReplayingDecoder<Void> {
protected void decode(ChannelHandlerContext ctx, ByteBuf buf) throws Exception {
out.add(buf.readBytes(buf.readInt()));
}
}
原理
究其原理是该decoder中使用了ByteBuf的一个特殊实现ReplayingDecoderByteBuf (代理模式),ReplayingDecoderByteBuf的关键部分如下:
package io.netty.handler.codec;
// ...
final class ReplayingDecoderByteBuf extends ByteBuf {
private static final Signal REPLAY = ReplayingDecoder.REPLAY;
private ByteBuf buffer; // proxy pattern here : )
private void checkIndex(int index, int length) {
if (index + length > buffer.writerIndex()) {
throw REPLAY;
}
}
private void checkReadableBytes(int readableBytes) {
if (buffer.readableBytes() < readableBytes) {
throw REPLAY;
}
}
@Override
public int readInt() {
checkReadableBytes(4);
return buffer.readInt();
}
// omitted...
}
ReplayingDecoderByteBuf重写了ByteBuf的readXxx()等方法,在调用真正的buf做相关操作时,会先检查可读字节长度,一旦检测到不满足要求就直接抛出REPLAY(REPLAY继承ERROR)。
而ReplayingDecoder重写了ByteToMessageDecoder的callDecode()方法:
@Override
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
// catches REPLAY here and reset readerIndex
}
此方法会捕获Signal并在catch块中重置ByteBuf的readerIndex。
值得注意的是ReplayingDecoderByteBuf和ReplayingDecoder中的Signal都使用了static final修饰符,且始终引用同一个对象以节省内存开销。如ReplayingDecoder中的Signal定义为:
static final Signal REPLAY = Signal.valueOf(ReplayingDecoder.class, "REPLAY");
限制
ReplayingDecoder虽然提供了不少便利,但是也存在使用限制。包括:buffer的部分操作(readBytes(ByteBuffer dst)、retain()、release()等方法会直接抛出异常);在某些情况下会影响性能(如多次对同一段消息解码)。
TCP是基于流的,只保证接收到数据包分片顺序,而不保证接收到的数据包每个分片大小。因此在使用ReplayingDecoder时,即使不存在多线程,同一个线程也可能多次调用decode()方法。在decode中修改ReplayingDecoder的类变量时必须小心谨慎。这里顺便提醒一下除非是特殊设计,如组合使用LineBasedFrameDecoder和StringDecoder,否则独立使用的Decoder都是非共享的。
错误的例子:
public class MyDecoder extends ReplayingDecoder<Void> {
private final Queue<Integer> values = new LinkedList<Integer>();
@Override
public void decode(ByteBuf buf, List<Object> out) throws Exception {
// A message contains 2 integers.
values.offer(buf.readInt());
values.offer(buf.readInt());
assert values.size() == 2;
out.add(values.poll() + values.poll());
}
}
当buf中有两个int但分为两个包传过来时,上面代码中decode方法会被调用两次,此时队列size为3,这段代码达不到期望结果。正确的做法是每次在decode中先清空队列:
public class MyDecoder extends ReplayingDecoder<Void> {
private final Queue<Integer> values = new LinkedList<Integer>();
@Override
public void decode(ByteBuf buf, List<Object> out) throws Exception {
// Revert the state of the variable that might have been changed
// since the last partial decode.
values.clear();
// A message contains 2 integers.
values.offer(buf.readInt());
values.offer(buf.readInt());
// Now we know this assertion will never fail.
assert values.size() == 2;
out.add(values.poll() + values.poll());
}
}
提高运行效率
为了提高处理复杂消息的性能,ReplayingDecoder提供了checkpoint机制。此方法会将下次decode对buffer开始解码的位置置为当前读指针位置。当需要解码的消息很复杂时,推荐使用枚举泛型来创建多个解码保存点:
public enum MyDecoderState {
READ_LENGTH,
READ_CONTENT;
}
public class IntegerHeaderFrameDecoder extends ReplayingDecoder<MyDecoderState> {
private int length;
public IntegerHeaderFrameDecoder() {
// Set the initial state.
super(MyDecoderState.READ_LENGTH);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
switch (state()) {
case READ_LENGTH:
length = buf.readInt();
checkpoint(MyDecoderState.READ_CONTENT);
case READ_CONTENT:
ByteBuf frame = buf.readBytes(length);
checkpoint(MyDecoderState.READ_LENGTH);
out.add(frame);
break;
default:
throw new Error("Shouldn't reach here.");
}
}
}
运行时替换decoder
实际工作中往往需要使用一个decoder来处理多种协议。此时可以用一个decoder来检查协议类别,然后用具体的协议处理器来处理。下面例子中的FirstDecoder就可以看做一个协议探测器:
public class FirstDecoder extends ReplayingDecoder<Void> {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) {
// Decode the first message
Object firstMessage = ...;
// Add the second decoder
ctx.pipeline().addLast("second", new SecondDecoder());
if (buf.isReadable()) {
// Hand off the remaining data to the second decoder
out.add(firstMessage);
out.add(buf.readBytes(super.actualReadableBytes()));
} else {
// Nothing to hand off
out.add(firstMessage);
}
// Remove the first decoder (me)
ctx.pipeline().remove(this);
}
}
转载于:https://my.oschina.net/landas/blog/893915