欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

关于Netty的ReplayingDecoder

程序员文章站 2022-04-22 19:41:52
...

        Netty用户手册介绍Decoder的章节重点介绍了ByteToMessageDecoder,顺道提到了一个陌生的ReplayingDecoder,学习并记录之。下文的例子均来自于ReplayingDecoder源码注释。
        先看ReplayingDecoder的类定义:

public abstract class ReplayingDecoder<S> extends ByteToMessageDecoder {}

        可见其是ByteToMessageDecoder的子类。类定义中的泛型 S 是一个用于记录解码状态的状态机枚举类,在state(S s)、checkpoint(S s)等方法中会用到。在简单解码时也可以用java.lang.Void来占位。

        与ByteToMessageDecoder不同,该类可以在接收到所需要长度的字节之后再调用decode方法,而不用一遍又一遍的手动检查流中的字节长度。下面给出官方的例子:
        首先是作对比的ByteToMessageDecoder风格实现,也是我之前的习惯做法 :-(

public class IntegerHeaderFrameDecoder extends ByteToMessageDecoder{
    protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
        if (buf.readableBytes() < 4) {
            return;
        }
        buf.markReaderIndex();
        int length = buf.readInt();
        if (buf.readableBytes() < length) {
            buf.resetReaderIndex();
            return;
        }
        out.add(buf.readBytes(length));
    }
}

        可见上面解码时略微繁琐,必须两次判断buf可读字节长度,并手动恢复readerIndex。下面来看简洁的ReplayingDecoder实现:

public class IntegerHeaderFrameDecoder extends ReplayingDecoder<Void> {
    protected void decode(ChannelHandlerContext ctx, ByteBuf buf) throws Exception {
        out.add(buf.readBytes(buf.readInt()));
    }
}

原理

        究其原理是该decoder中使用了ByteBuf的一个特殊实现ReplayingDecoderByteBuf (代理模式),ReplayingDecoderByteBuf的关键部分如下:

package io.netty.handler.codec;
// ...
final class ReplayingDecoderByteBuf extends ByteBuf {
    private static final Signal REPLAY = ReplayingDecoder.REPLAY;
    private ByteBuf buffer; // proxy pattern here : )
    
    private void checkIndex(int index, int length) {
        if (index + length > buffer.writerIndex()) {
            throw REPLAY;
        }
    }

    private void checkReadableBytes(int readableBytes) {
        if (buffer.readableBytes() < readableBytes) {
            throw REPLAY;
        }
    }

    @Override
    public int readInt() {
        checkReadableBytes(4);
        return buffer.readInt();
    }

    // omitted...
}

        ReplayingDecoderByteBuf重写了ByteBuf的readXxx()等方法,在调用真正的buf做相关操作时,会先检查可读字节长度,一旦检测到不满足要求就直接抛出REPLAY(REPLAY继承ERROR)。

        而ReplayingDecoder重写了ByteToMessageDecoder的callDecode()方法:

@Override
protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    // catches REPLAY here and reset readerIndex
}

        此方法会捕获Signal并在catch块中重置ByteBuf的readerIndex。

        值得注意的是ReplayingDecoderByteBuf和ReplayingDecoder中的Signal都使用了static final修饰符,且始终引用同一个对象以节省内存开销。如ReplayingDecoder中的Signal定义为:

static final Signal REPLAY = Signal.valueOf(ReplayingDecoder.class, "REPLAY");

限制

        ReplayingDecoder虽然提供了不少便利,但是也存在使用限制。包括:buffer的部分操作(readBytes(ByteBuffer dst)、retain()、release()等方法会直接抛出异常);在某些情况下会影响性能(如多次对同一段消息解码)。
        TCP是基于流的,只保证接收到数据包分片顺序,而不保证接收到的数据包每个分片大小。因此在使用ReplayingDecoder时,即使不存在多线程,同一个线程也可能多次调用decode()方法。在decode中修改ReplayingDecoder的类变量时必须小心谨慎。这里顺便提醒一下除非是特殊设计,如组合使用LineBasedFrameDecoder和StringDecoder,否则独立使用的Decoder都是非共享的
        错误的例子:

public class MyDecoder extends ReplayingDecoder<Void> {
    private final Queue<Integer> values = new LinkedList<Integer>();
    @Override
    public void decode(ByteBuf buf, List<Object> out) throws Exception {
        // A message contains 2 integers.
        values.offer(buf.readInt());
        values.offer(buf.readInt());
        assert values.size() == 2;
        out.add(values.poll() + values.poll());
    }
}

        当buf中有两个int但分为两个包传过来时,上面代码中decode方法会被调用两次,此时队列size为3,这段代码达不到期望结果。正确的做法是每次在decode中先清空队列:

public class MyDecoder extends ReplayingDecoder<Void> {
    private final Queue<Integer> values = new LinkedList<Integer>();
    @Override
    public void decode(ByteBuf buf, List<Object> out) throws Exception {
        // Revert the state of the variable that might have been changed
        // since the last partial decode.
        values.clear();

        // A message contains 2 integers.
        values.offer(buf.readInt());
        values.offer(buf.readInt());
        // Now we know this assertion will never fail.
        assert values.size() == 2;
        out.add(values.poll() + values.poll());
    }
}

提高运行效率

        为了提高处理复杂消息的性能,ReplayingDecoder提供了checkpoint机制。此方法会将下次decode对buffer开始解码的位置置为当前读指针位置。当需要解码的消息很复杂时,推荐使用枚举泛型来创建多个解码保存点:

public enum MyDecoderState {
    READ_LENGTH,
    READ_CONTENT;
}
public class IntegerHeaderFrameDecoder extends ReplayingDecoder<MyDecoderState> {
    private int length;
    public IntegerHeaderFrameDecoder() {
        // Set the initial state.
        super(MyDecoderState.READ_LENGTH);
    }

    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) throws Exception {
        switch (state()) {
            case READ_LENGTH:
                length = buf.readInt();
                checkpoint(MyDecoderState.READ_CONTENT);
            case READ_CONTENT:
                ByteBuf frame = buf.readBytes(length);
                checkpoint(MyDecoderState.READ_LENGTH);
                out.add(frame);
                break;
            default:
                throw new Error("Shouldn't reach here.");
        }
    }
}

运行时替换decoder

        实际工作中往往需要使用一个decoder来处理多种协议。此时可以用一个decoder来检查协议类别,然后用具体的协议处理器来处理。下面例子中的FirstDecoder就可以看做一个协议探测器:

public class FirstDecoder extends ReplayingDecoder<Void> {
    @Override
    protected void decode(ChannelHandlerContext ctx, ByteBuf buf, List<Object> out) {
        // Decode the first message
        Object firstMessage = ...;
        
        // Add the second decoder
        ctx.pipeline().addLast("second", new SecondDecoder());

        if (buf.isReadable()) {
            // Hand off the remaining data to the second decoder
            out.add(firstMessage);
            out.add(buf.readBytes(super.actualReadableBytes()));
        } else {
            // Nothing to hand off
            out.add(firstMessage);
        }
        // Remove the first decoder (me)
        ctx.pipeline().remove(this);
    }
}

 

转载于:https://my.oschina.net/landas/blog/893915