欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Netty源码学习-FrameDecoder

程序员文章站 2022-04-22 22:21:40
...
Netty 3.x的user guide里FrameDecoder的例子,有几个疑问:
1.文档说:FrameDecoder calls decode method with an internally maintained cumulative buffer whenever new data is received.
为什么每次有新数据到达时,都会调用decode方法?
2.Decoder与其他handler在pipeline的顺序是怎样的?谁先谁后?
3.所要接收的数据可能要经过多次才能接收完全,那这之前数据是如何保留?

先说结论:
1.因为每一次消息到达时都会触发pipeline的Upstream处理流程,最终会调用handler的messageReceived方法,而FrameDecoder的messageRecieved方法会调用decode方法
2.Decoder在前
3.FrameDecoder维护了一个ChannelBuffer(作为它的field)

文档中TimeDecoder的decode方法:


protected Object decode(
ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer)2 {

if (buffer.readableBytes() < 4) {
return null; 3
}

return buffer.readBytes(4);
}




查看一下FrameDecoder 源码:

FrameDecoder继承自SimpleChannelUpstreamHandler,而SimpleChannelUpstreamHandler实现了ChannelUpstreamHandler接口
ChannelUpstreamHandler很简单,只定义了一个handleUpstream方法

要理解FrameDecoder,首先要理解Netty的event在pipeline里面是怎么流转处理的,可以参看这篇博文[url]http://bylijinnan.iteye.com/blog/1981763[/url]:
其次,理解Upstream和Downstream
我是这样理解的,对于Handler来说,Handler接收的消息是Upstream;从Handler发出的消息,是Downstream
然后,我们就可以理解FrameDecoder的流程了:

-->ChannlPipeline 开始处理Upstream,会调用sendUpstream方法,
-->调用SimpleChannelUpstreamHandler(也就是FrameDecoder)的handleUpstream方法
而SimpleChannelUpstreamHandler的handleUpstream会触发messageReceived方法:


public void handleUpstream(
ChannelHandlerContext ctx, ChannelEvent e) throws Exception {

if (e instanceof MessageEvent) {
messageReceived(ctx, (MessageEvent) e);
}
/*omit others*/


-->FrameDecoder重写了messageReceived方法,在messageReceived里面,就会调用到文章开头提到的decode方法:


public void messageReceived(
ChannelHandlerContext ctx, MessageEvent e) {

/*只保留关键代码*/
ChannelBuffer input = (ChannelBuffer) e.getMessage();
callDecode(ctx, e.getChannel(), input, e.getRemoteAddress());
updateCumulation(ctx, input);
}



而callDecode方法的关键代码是这样的:


private void callDecode(
ChannelHandlerContext context, Channel channel,
ChannelBuffer cumulation, SocketAddress remoteAddress) throws Exception {

while (cumulation.readable()) {
Object frame = decode(context, channel, cumulation);
unfoldAndFireMessageReceived(context, remoteAddress, frame);
}
}


在unfoldAndFireMessageReceived方法里面,会调用Channels.fireMessageReceived,最后会调用ctx.sendUpstream,事件会交给下一个Handler来处理
在示例中,下一个Handler就是TimeClientHandler
因此,如果TimeDecoder(extends FrameDecoder)的decode方法返回了UnixTime对象,那TimeClientHandler就可以直接拿到并强制转换成UnixTime对象:
TimeDecoder:


protected Object decode(
ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) {
if (buffer.readableBytes() < 4) {
return null;
}
return new UnixTime(buffer.readInt());1
}


TimeClientHandler:


public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
UnixTime m = (UnixTime) e.getMessage();
}



pipeline里面对Upstream的处理,在handler链表里面,是从head到tail,因此TimeDecoder应在TimeClientHandler之前:


bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() {
return Channels.pipeline(
new TimeDecoder(),
new TimeClientHandler());
}
});



开发中,只需要重写FrameDecoder.decode方法就可以了
由于Netty接收到数据后,会不断触发整个Upstream的处理流程,像上面分析的那样,因此,decode方法就会不断被调用


最后一个问题,FrameDecoder是怎样保留之前接收到的数据呢?
原来FrameDecoder维护了一个ChannelBuffer(作为它的field),源码里命名为cumulation
cumulation会一直保留数据,并按需扩容,直到所需要的数据全部接收到达为止

看看源码:


//decode得到的结果可能是一个数组或者集合,如果unfold=true,则让每一个元素都触发一个fireMessageReceived事件
private boolean unfold;
protected ChannelBuffer cumulation; //将每次接收到的数据累积起来,直到数据接收完全
private volatile ChannelHandlerContext ctx;

//cumulation使用了CompositeChannelBuffer来避免“memory copy”(ChannelBuffers.wrappedBuffer)
//cumulation是否需要开辟新内存空间并复制数据,取决于两个值:
//1.copyThreshold是从cumulation的总大小来限制
//2.maxCumulationBufferComponents是从cumulation的component的个数来限制

//如果cumulation的大小超过此值,就把cumulation复制到一个新创建的ChannelBuffer里面
private int copyThreshold;

//如果cumulation中component的个数超过此值,则像上面那样进行复制
private int maxCumulationBufferComponents;

//这个方法体现了上面据说的第2个限制:maxCumulationBufferComponents
protected ChannelBuffer appendToCumulation(ChannelBuffer input) {
ChannelBuffer cumulation = this.cumulation;
assert cumulation.readable();
if (cumulation instanceof CompositeChannelBuffer) {
// Make sure the resulting cumulation buffer has no more than the configured components.
CompositeChannelBuffer composite = (CompositeChannelBuffer) cumulation;
if (composite.numComponents() >= maxCumulationBufferComponents) {
cumulation = composite.copy();
}
}

this.cumulation = input = ChannelBuffers.wrappedBuffer(cumulation, input);
return input;
}

//这个方法体现了上面据说的第1个限制:copyThreshold
protected ChannelBuffer updateCumulation(ChannelHandlerContext ctx, ChannelBuffer input) {
ChannelBuffer newCumulation;
int readableBytes = input.readableBytes();
if (readableBytes > 0) {
int inputCapacity = input.capacity();

//超限了
if (readableBytes < inputCapacity && inputCapacity > copyThreshold) {

//newCumulationBuffer方法新建一个指定大小的ChannelBuffer
cumulation = newCumulation = newCumulationBuffer(ctx, input.readableBytes());
cumulation.writeBytes(input);
} else {
if (input.readerIndex() != 0) {
cumulation = newCumulation = input.slice();
} else {
cumulation = newCumulation = input;
}
}
} else {
cumulation = newCumulation = null;
}
return newCumulation;
}

//unfold的用途
protected final void unfoldAndFireMessageReceived(
ChannelHandlerContext context, SocketAddress remoteAddress, Object result) {
if (unfold) {
if (result instanceof Object[]) {
for (Object r: (Object[]) result) {
Channels.fireMessageReceived(context, r, remoteAddress);
}
} else if (result instanceof Iterable<?>) {
for (Object r: (Iterable<?>) result) {
Channels.fireMessageReceived(context, r, remoteAddress);
}
} else {
Channels.fireMessageReceived(context, result, remoteAddress);
}
} else {
Channels.fireMessageReceived(context, result, remoteAddress);
}
}
相关标签: java netty