欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Netty源码学习-ReplayingDecoder

程序员文章站 2022-04-22 23:50:09
...
ReplayingDecoder是FrameDecoder的子类,不熟悉FrameDecoder的,可以先看看
[url]http://bylijinnan.iteye.com/blog/1982618[/url]

API说,ReplayingDecoder简化了操作,比如:

FrameDecoder在decode时,需要判断数据是否接收完全:


public class IntegerHeaderFrameDecoder extends FrameDecoder {

protected Object decode(ChannelHandlerContext ctx,
Channel channel,
ChannelBuffer buf) throws Exception {

if (buf.readableBytes() < 4) {
return null;
}

buf.markReaderIndex();
int length = buf.readInt();

if (buf.readableBytes() < length) {
buf.resetReaderIndex();
return null;
}

return buf.readBytes(length);
}
}




而ReplayingDecoder则隐藏了这些判断,好像数据已经接收完全了一样:



public class IntegerHeaderFrameDecoder
extends ReplayingDecoder<VoidEnum> {

protected Object decode(ChannelHandlerContext ctx,
Channel channel,
ChannelBuffer buf,
VoidEnum state) throws Exception {
int length = buf.readInt();
return buf.readBytes(length);
}
}


可见,ReplayingDecoder使用起来要简单、直观

除此之外,
很多时候frame的结构不会是“length + content”这么简单,如果结构复杂了,
FrameDecoder的if语句就会非常多,且先到达的sub-frame就会被多次decode
ReplayingDecoder允许用户自定义state,表示decode到哪一部分了,从而下
一次数据到达时,直接从nextState开始decode

那ReplayingDecoder这是怎么做到“好像数据已经接收完全了一样”?
事实上它跟FrameDecoder的思路是一样的,只是它在数据接收不完全时,
“悄悄地失败”。这主要是通过ReplayingDecoderBuffer来实现的:


class ReplayingDecoderBuffer implements ChannelBuffer {

private static final Error REPLAY = new ReplayError();

private final ReplayingDecoder<?> parent;
private boolean terminated;

//ReplayingDecoder作为参数
ReplayingDecoderBuffer(ReplayingDecoder<?> parent) {
this.parent = parent;
}

/*ReplayingDecoderBuffer实际上是FrameDecoder里面的cumulation(类型为ChannelBuffer)
FrameDecoder里面,对数据的累积接收,以及数据的读取,都是在cumulation,
而ReplayingDecoderBuffer对cumulation进行了封装,在数据不完全时,读取操作都会抛出Error
*/
private ChannelBuffer buf() {
return parent.internalBuffer();
}

public byte getByte(int index) {
checkIndex(index);
return buf().getByte(index);
}

//数据接收不完全时,抛出Error,在ReplayingDecoder里捕获
private void checkIndex(int index) {
if (index > buf().writerIndex()) {
throw REPLAY;
}
}





public abstract class ReplayingDecoder<T extends Enum<T>>
extends FrameDecoder {

//创建了ReplayingDecoderBuffer
private final ReplayingDecoderBuffer replayable = new ReplayingDecoderBuffer(this);
private T state;

//标记着下一次数据读取的开始位置,也代表着已经decode到哪一部分了
private int checkpoint;

protected void checkpoint() {
ChannelBuffer cumulation = this.cumulation;
if (cumulation != null) {
checkpoint = cumulation.readerIndex();
} else {
checkpoint = -1; // buffer not available (already cleaned up)
}
}

protected void checkpoint(T state) {
checkpoint();
setState(state);
}

protected T setState(T newState) {
T oldState = state;
state = newState;
return oldState;
}

//这个方法交由子类实现,子类可以定义更详细、语义更丰富的state
protected abstract Object decode(ChannelHandlerContext ctx,
Channel channel, ChannelBuffer buffer, T state) throws Exception;

//重点在这个方法,注意对ReplayError的处理
private void callDecode(
ChannelHandlerContext context, Channel channel,
ChannelBuffer input, ChannelBuffer replayableInput, SocketAddress remoteAddress) throws Exception {
while (input.readable()) {
int oldReaderIndex = checkpoint = input.readerIndex();
Object result = null;
T oldState = state;
try {

//如果数据接收还不完全,decode方法就会抛出ReplayError
//因为decode方法会调用replayableInput(类型为ReplayingDecoderBuffer)的getXXX方法
result = decode(context, channel, replayableInput, state);
if (result == null) {
//省略
}
} catch (ReplayError replay) {

//数据不充足,回退到checkpoint,下一次messageReceived再试
// Return to the checkpoint (or oldPosition) and retry.
int checkpoint = this.checkpoint;
if (checkpoint >= 0) {
input.readerIndex(checkpoint);
} else {
// Called by cleanup() - no need to maintain the readerIndex
// anymore because the buffer has been released already.
}
}

if (result == null) {
// Seems like more data is required.
// Let us wait for the next notification.
break;
}

// A successful decode
unfoldAndFireMessageReceived(context, remoteAddress, result);
}
}



ReplayingDecoder的使用,API给出了一个简单的例子:


public class IntegerHeaderFrameDecoder
extends ReplayingDecoder<MyDecoderState> {

private int length;

public IntegerHeaderFrameDecoder() {
// Set the initial state.
super(MyDecoderState.READ_LENGTH);
}

protected Object decode(ChannelHandlerContext ctx,
Channel channel,
ChannelBuffer buf,
MyDecoderState state) throws Exception {
switch (state) {
case READ_LENGTH:
length = buf.readInt();

//设置下一次的decode从哪一个state开始
checkpoint(MyDecoderState.READ_CONTENT);
case READ_CONTENT:
ChannelBuffer frame = buf.readBytes(length);
checkpoint(MyDecoderState.READ_LENGTH);
return frame;
default:
throw new Error("Shouldn't reach here.");
}
}
}



注意到上面的switch语句是不需要break的,因为case READ_LENGTH后:
1.如果数据不充足,那就会在buf.readBytes(length)里面抛出ReplayError,相当于有了break语句
2.如果数据充足,那就接着decode,最终成功并 return frame

除了API以外,下面这两篇文章写得非常好:
[url]http://biasedbit.com/netty-tutorial-replaying-decoder/[/url]
[url]http://biasedbit.com/an-enhanced-version-of-replayingdecoder-for-netty/[/url]

其中第二个例子,文章中给的代码不是很完整,我补全并做了一个简单的测试:
[url]https://github.com/bylijinnan/nettyLearn/tree/master/ljn-netty3-learn/src/main/java/com/ljn/handler/replay[/url]
相关标签: java netty