Netty源码分析 (十一)----- 拆包器之LengthFieldBasedFrameDecoder
本篇文章主要是介绍使用lengthfieldbasedframedecoder解码器自定义协议。通常,协议的格式如下:
lengthfieldbasedframedecoder是netty解决拆包粘包问题的一个重要的类,主要结构就是header+body结构。我们只需要传入正确的参数就可以发送和接收正确的数据,那么重点就在于这几个参数的意义。下面我们就具体了解一下这几个参数的意义。先来看一下lengthfieldbasedframedecoder主要的构造方法:
public lengthfieldbasedframedecoder( int maxframelength, int lengthfieldoffset, int lengthfieldlength, int lengthadjustment, int initialbytestostrip)
那么这几个重要的参数如下:
- maxframelength:最大帧长度。也就是可以接收的数据的最大长度。如果超过,此次数据会被丢弃。
- lengthfieldoffset:长度域偏移。就是说数据开始的几个字节可能不是表示数据长度,需要后移几个字节才是长度域。
- lengthfieldlength:长度域字节数。用几个字节来表示数据长度。
- lengthadjustment:数据长度修正。因为长度域指定的长度可以使header+body的整个长度,也可以只是body的长度。如果表示header+body的整个长度,那么我们需要修正数据长度。
- initialbytestostrip:跳过的字节数。如果你需要接收header+body的所有数据,此值就是0,如果你只想接收body数据,那么需要跳过header所占用的字节数。
下面我们根据几个例子的使用来具体说明这几个参数的使用。
lengthfieldbasedframedecoder 的用法
需求1
长度域为2个字节,我们要求发送和接收的数据如下所示:
发送的数据 (14 bytes) 接收到数据 (14 bytes) +--------+----------------+ +--------+----------------+ | length | actual content |----->| length | actual content | | 12 | "hello, world" | | 12 | "hello, world" | +--------+----------------+ +--------+----------------+
留心的你肯定发现了,长度域只是实际内容的长度,不包括长度域的长度。下面是参数的值:
- lengthfieldoffset=0:开始的2个字节就是长度域,所以不需要长度域偏移。
- lengthfieldlength=2:长度域2个字节。
- lengthadjustment=0:数据长度修正为0,因为长度域只包含数据的长度,所以不需要修正。
- initialbytestostrip=0:发送和接收的数据完全一致,所以不需要跳过任何字节。
需求2
长度域为2个字节,我们要求发送和接收的数据如下所示:
发送的数据 (14 bytes) 接收到数据 (12 bytes) +--------+----------------+ +----------------+ | length | actual content |----->| actual content | | 12 | "hello, world" | | "hello, world" | +--------+----------------+ +----------------+
参数值如下:
- lengthfieldoffset=0:开始的2个字节就是长度域,所以不需要长度域偏移。
- lengthfieldlength=2:长度域2个字节。
- lengthadjustment=0:数据长度修正为0,因为长度域只包含数据的长度,所以不需要修正。
- initialbytestostrip=2:我们发现接收的数据没有长度域的数据,所以要跳过长度域的2个字节。
需求3
长度域为2个字节,我们要求发送和接收的数据如下所示:
before decode (14 bytes) after decode (14 bytes) +--------+----------------+ +--------+----------------+ | length | actual content |----->| length | actual content | | 14 | "hello, world" | | 14 | "hello, world" | +--------+----------------+ +--------+----------------+
留心的你肯定又发现了,长度域表示的长度是总长度 也就是header+body的总长度。参数如下:
- lengthfieldoffset=0:开始的2个字节就是长度域,所以不需要长度域偏移。
- lengthfieldlength=2:长度域2个字节。
- lengthadjustment=-2:因为长度域为总长度,所以我们需要修正数据长度,也就是减去2。
- initialbytestostrip=0:我们发现接收的数据没有长度域的数据,所以要跳过长度域的2个字节。
需求4
长度域为2个字节,我们要求发送和接收的数据如下所示:
before decode (17 bytes) after decode (17 bytes) +----------+----------+----------------+ +----------+----------+----------------+ | meta | length | actual content |----->| meta | length | actual content | | 0xcafe | 12 | "hello, world" | | 0xcafe | 12 | "hello, world" | +----------+----------+----------------+ +----------+----------+----------------+
我们发现,数据的结构有点变化,变成了 meta+header+body的结构。meta一般表示元数据,魔数等。我们定义这里meta有三个字节。参数如下:
- lengthfieldoffset=3:开始的3个字节是meta,然后才是长度域,所以长度域偏移为3。
- lengthfieldlength=2:长度域2个字节。
- lengthadjustment=0:长度域指定的长度位数据长度,所以数据长度不需要修正。
- initialbytestostrip=0:发送和接收数据相同,不需要跳过数据。
需求5
长度域为2个字节,我们要求发送和接收的数据如下所示:
before decode (17 bytes) after decode (17 bytes) +----------+----------+----------------+ +----------+----------+----------------+ | length | meta | actual content |----->| length | meta | actual content | | 12 | 0xcafe | "hello, world" | | 12 | 0xcafe | "hello, world" | +----------+----------+----------------+ +----------+----------+----------------+
我们发现,数据的结构有点变化,变成了 header+meta+body的结构。meta一般表示元数据,魔数等。我们定义这里meta有三个字节。参数如下:
- lengthfieldoffset=0:开始的2个字节就是长度域,所以不需要长度域偏移。
- lengthfieldlength=2:长度域2个字节。
- lengthadjustment=3:我们需要把meta+body当做body处理,所以数据长度需要加3。
- initialbytestostrip=0:发送和接收数据相同,不需要跳过数据。
需求6
长度域为2个字节,我们要求发送和接收的数据如下所示:
before decode (16 bytes) after decode (13 bytes) +------+--------+------+----------------+ +------+----------------+ | hdr1 | length | hdr2 | actual content |----->| hdr2 | actual content | | 0xca | 0x000c | 0xfe | "hello, world" | | 0xfe | "hello, world" | +------+--------+------+----------------+ +------+----------------+
我们发现,数据的结构有点变化,变成了 hdr1+header+hdr2+body的结构。我们定义这里hdr1和hdr2都只有1个字节。参数如下:
- lengthfieldoffset=1:开始的1个字节是长度域,所以需要设置长度域偏移为1。
- lengthfieldlength=2:长度域2个字节。
- lengthadjustment=1:我们需要把hdr2+body当做body处理,所以数据长度需要加1。
- initialbytestostrip=3:接收数据不包括hdr1和长度域相同,所以需要跳过3个字节。
lengthfieldbasedframedecoder 源码剖析
实现拆包抽象
在前面的文章中我们知道,具体的拆包协议只需要实现
void decode(channelhandlercontext ctx, bytebuf in, list<object> out)
其中 in 表示目前为止还未拆的数据,拆完之后的包添加到 out这个list中即可实现包向下传递,第一层实现比较简单
@override protected final void decode(channelhandlercontext ctx, bytebuf in, list<object> out) throws exception { object decoded = decode(ctx, in); if (decoded != null) { out.add(decoded); } }
重载的protected函数decode做真正的拆包动作
protected object decode(channelhandlercontext ctx, bytebuf in) throws exception { if (this.discardingtoolongframe) { long bytestodiscard = this.bytestodiscard; int localbytestodiscard = (int)math.min(bytestodiscard, (long)in.readablebytes()); in.skipbytes(localbytestodiscard); bytestodiscard -= (long)localbytestodiscard; this.bytestodiscard = bytestodiscard; this.failifnecessary(false); } // 如果当前可读字节还未达到长度长度域的偏移,那说明肯定是读不到长度域的,直接不读 if (in.readablebytes() < this.lengthfieldendoffset) { return null; } else { // 拿到长度域的实际字节偏移,就是长度域的开始下标 // 这里就是需求4,开始的几个字节并不是长度域 int actuallengthfieldoffset = in.readerindex() + this.lengthfieldoffset; // 拿到实际的未调整过的包长度 // 就是读取长度域的十进制值,最原始传过来的包的长度 long framelength = this.getunadjustedframelength(in, actuallengthfieldoffset, this.lengthfieldlength, this.byteorder); // 如果拿到的长度为负数,直接跳过长度域并抛出异常 if (framelength < 0l) { in.skipbytes(this.lengthfieldendoffset); throw new corruptedframeexception("negative pre-adjustment length field: " + framelength); } else { // 调整包的长度 framelength += (long)(this.lengthadjustment + this.lengthfieldendoffset); // 整个数据包的长度还没有长度域长,直接抛出异常 if (framelength < (long)this.lengthfieldendoffset) { in.skipbytes(this.lengthfieldendoffset); throw new corruptedframeexception("adjusted frame length (" + framelength + ") is less " + "than lengthfieldendoffset: " + this.lengthfieldendoffset); // 数据包长度超出最大包长度,进入丢弃模式 } else if (framelength > (long)this.maxframelength) { long discard = framelength - (long)in.readablebytes(); this.toolongframelength = framelength; if (discard < 0l) { in.skipbytes((int)framelength); } else { this.discardingtoolongframe = true; this.bytestodiscard = discard; in.skipbytes(in.readablebytes()); } this.failifnecessary(true); return null; } else { int framelengthint = (int)framelength; //当前可读的字节数小于包中的length,什么都不做,等待下一次解码 if (in.readablebytes() < framelengthint) { return null; //跳过的字节不能大于数据包的长度,否则就抛出 corruptedframeexception 的异常 } else if (this.initialbytestostrip > framelengthint) { in.skipbytes(framelengthint); throw new corruptedframeexception("adjusted frame length (" + framelength + ") is less " + "than initialbytestostrip: " + this.initialbytestostrip); } else { //根据initialbytestostrip的设置来跳过某些字节 in.skipbytes(this.initialbytestostrip); //拿到当前累积数据的读指针 int readerindex = in.readerindex(); //拿到待抽取数据包的实际长度 int actualframelength = framelengthint - this.initialbytestostrip; //进行抽取 bytebuf frame = this.extractframe(ctx, in, readerindex, actualframelength); //移动读指针 in.readerindex(readerindex + actualframelength); return frame; } } } } }
下面分几个部分来分析一下这个重量级函数
获取frame长度
获取需要待拆包的包大小
// 拿到长度域的实际字节偏移,就是长度域的开始下标 // 这里就是需求4,开始的几个字节并不是长度域 int actuallengthfieldoffset = in.readerindex() + this.lengthfieldoffset; // 拿到实际的未调整过的包长度 // 就是读取长度域的十进制值,最原始传过来的包的长度 long framelength = this.getunadjustedframelength(in, actuallengthfieldoffset, this.lengthfieldlength, this.byteorder); // 调整包的长度 framelength += (long)(this.lengthadjustment + this.lengthfieldendoffset);
上面这一段内容有个扩展点 getunadjustedframelength,如果你的长度域代表的值表达的含义不是正常的int,short等基本类型,你可以重写这个函数
protected long getunadjustedframelength(bytebuf buf, int offset, int length, byteorder order) { buf = buf.order(order); long framelength; switch (length) { case 1: framelength = buf.getunsignedbyte(offset); break; case 2: framelength = buf.getunsignedshort(offset); break; case 3: framelength = buf.getunsignedmedium(offset); break; case 4: framelength = buf.getunsignedint(offset); break; case 8: framelength = buf.getlong(offset); break; default: throw new decoderexception( "unsupported lengthfieldlength: " + lengthfieldlength + " (expected: 1, 2, 3, 4, or 8)"); } return framelength; }
跳过指定字节长度
int framelengthint = (int)framelength; //当前可读的字节数小于包中的length,什么都不做,等待下一次解码 if (in.readablebytes() < framelengthint) { return null; //跳过的字节不能大于数据包的长度,否则就抛出 corruptedframeexception 的异常 } else if (this.initialbytestostrip > framelengthint) { in.skipbytes(framelengthint); throw new corruptedframeexception("adjusted frame length (" + framelength + ") is less " + "than initialbytestostrip: " + this.initialbytestostrip); } //根据initialbytestostrip的设置来跳过某些字节 in.skipbytes(this.initialbytestostrip);
先验证当前是否已经读到足够的字节,如果读到了,在下一步抽取一个完整的数据包之前,需要根据initialbytestostrip的设置来跳过某些字节(见文章开篇),当然,跳过的字节不能大于数据包的长度,否则就抛出 corruptedframeexception 的异常
抽取frame
//根据initialbytestostrip的设置来跳过某些字节 in.skipbytes(this.initialbytestostrip); //拿到当前累积数据的读指针 int readerindex = in.readerindex(); //拿到待抽取数据包的实际长度 int actualframelength = framelengthint - this.initialbytestostrip; //进行抽取 bytebuf frame = this.extractframe(ctx, in, readerindex, actualframelength); //移动读指针 in.readerindex(readerindex + actualframelength); return frame;
到了最后抽取数据包其实就很简单了,拿到当前累积数据的读指针,然后拿到待抽取数据包的实际长度进行抽取,抽取之后,移动读指针
protected bytebuf extractframe(channelhandlercontext ctx, bytebuf buffer, int index, int length) { return buffer.retainedslice(index, length); }
抽取的过程是简单的调用了一下 bytebuf 的retainedsliceapi,该api无内存copy开销
自定义解码器
协议实体的定义
public class myprotocolbean { //类型 系统编号 0xa 表示a系统,0xb 表示b系统 private byte type; //信息标志 0xa 表示心跳包 0xb 表示超时包 0xc 业务信息包 private byte flag; //内容长度 private int length; //内容 private string content; //省略get/set }
服务器端
服务端的实现
public class server { private static final int max_frame_length = 1024 * 1024; //最大长度 private static final int length_field_length = 4; //长度字段所占的字节数 private static final int length_field_offset = 2; //长度偏移 private static final int length_adjustment = 0; private static final int initial_bytes_to_strip = 0; private int port; public server(int port) { this.port = port; } public void start(){ eventloopgroup bossgroup = new nioeventloopgroup(1); eventloopgroup workergroup = new nioeventloopgroup(); try { serverbootstrap sbs = new serverbootstrap().group(bossgroup,workergroup).channel(nioserversocketchannel.class).localaddress(new inetsocketaddress(port)) .childhandler(new channelinitializer<socketchannel>() { protected void initchannel(socketchannel ch) throws exception { ch.pipeline().addlast(new myprotocoldecoder(max_frame_length,length_field_offset,length_field_length,length_adjustment,initial_bytes_to_strip,false)); ch.pipeline().addlast(new serverhandler()); }; }).option(channeloption.so_backlog, 128) .childoption(channeloption.so_keepalive, true); // 绑定端口,开始接收进来的连接 channelfuture future = sbs.bind(port).sync(); system.out.println("server start listen at " + port ); future.channel().closefuture().sync(); } catch (exception e) { bossgroup.shutdowngracefully(); workergroup.shutdowngracefully(); } } public static void main(string[] args) throws exception { int port; if (args.length > 0) { port = integer.parseint(args[0]); } else { port = 8080; } new server(port).start(); } }
自定义解码器myprotocoldecoder
public class myprotocoldecoder extends lengthfieldbasedframedecoder { private static final int header_size = 6; /** * @param maxframelength 帧的最大长度 * @param lengthfieldoffset length字段偏移的地址 * @param lengthfieldlength length字段所占的字节长 * @param lengthadjustment 修改帧数据长度字段中定义的值,可以为负数 因为有时候我们习惯把头部记入长度,若为负数,则说明要推后多少个字段 * @param initialbytestostrip 解析时候跳过多少个长度 * @param failfast 为true,当frame长度超过maxframelength时立即报toolongframeexception异常,为false,读取完整个帧再报异 */ public myprotocoldecoder(int maxframelength, int lengthfieldoffset, int lengthfieldlength, int lengthadjustment, int initialbytestostrip, boolean failfast) { super(maxframelength, lengthfieldoffset, lengthfieldlength, lengthadjustment, initialbytestostrip, failfast); } @override protected object decode(channelhandlercontext ctx, bytebuf in) throws exception { //在这里调用父类的方法,实现指得到想要的部分,我在这里全部都要,也可以只要body部分 in = (bytebuf) super.decode(ctx,in); if(in == null){ return null; } if(in.readablebytes()<header_size){ throw new exception("字节数不足"); } //读取type字段 byte type = in.readbyte(); //读取flag字段 byte flag = in.readbyte(); //读取length字段 int length = in.readint(); if(in.readablebytes()!=length){ throw new exception("标记的长度不符合实际长度"); } //读取body byte []bytes = new byte[in.readablebytes()]; in.readbytes(bytes); return new myprotocolbean(type,flag,length,new string(bytes,"utf-8")); } }
服务端hanlder
public class serverhandler extends channelinboundhandleradapter { @override public void channelread(channelhandlercontext ctx, object msg) throws exception { myprotocolbean myprotocolbean = (myprotocolbean)msg; //直接转化成协议消息实体 system.out.println(myprotocolbean.getcontent()); } @override public void channelactive(channelhandlercontext ctx) throws exception { super.channelactive(ctx); } }
客户端和客户端handler
public class client { static final string host = system.getproperty("host", "127.0.0.1"); static final int port = integer.parseint(system.getproperty("port", "8080")); static final int size = integer.parseint(system.getproperty("size", "256")); public static void main(string[] args) throws exception { // configure the client. eventloopgroup group = new nioeventloopgroup(); try { bootstrap b = new bootstrap(); b.group(group) .channel(niosocketchannel.class) .option(channeloption.tcp_nodelay, true) .handler(new channelinitializer<socketchannel>() { @override public void initchannel(socketchannel ch) throws exception { ch.pipeline().addlast(new myprotocolencoder()); ch.pipeline().addlast(new clienthandler()); } }); channelfuture future = b.connect(host, port).sync(); future.channel().closefuture().sync(); } finally { group.shutdowngracefully(); } } }
客户端编码器
public class myprotocolencoder extends messagetobyteencoder<myprotocolbean> { @override protected void encode(channelhandlercontext ctx, myprotocolbean msg, bytebuf out) throws exception { if(msg == null){ throw new exception("msg is null"); } out.writebyte(msg.gettype()); out.writebyte(msg.getflag()); out.writeint(msg.getlength()); out.writebytes(msg.getcontent().getbytes(charset.forname("utf-8"))); } }
- 编码的时候,只需要按照定义的顺序依次写入到bytebuf中.
客户端handler
public class clienthandler extends channelinboundhandleradapter { @override public void channelread(channelhandlercontext ctx, object msg) throws exception { super.channelread(ctx, msg); } @override public void channelactive(channelhandlercontext ctx) throws exception { myprotocolbean myprotocolbean = new myprotocolbean((byte)0xa, (byte)0xc, "hello,netty".length(), "hello,netty"); ctx.writeandflush(myprotocolbean); } }
上一篇: 新闻投稿,做SEO怎么充分的利用它呢?
下一篇: 任务栏的网络图标有黄色警示号的解决方法