编程思想:如何设计一个好的通信网络协议
当网络中两个进程需要通信时,我们往往会使用 socket
来实现。socket
都不陌生。当三次握手成功后,客户端与服务端就能通信,并且,彼此之间通信的数据包格式都是二进制,由 tcp/ip
协议负责传输。
当客户端和服务端取得了二进制数据包后,我们往往需要『萃取』出想要的数据,这样才能更好的执行业务逻辑。所以,我们需要定义好数据结构来描述这些二进制数据的格式,这就是通信网络协议。简单讲,就是需要约定好二进制数据包中每一段字节的含义,比如从第 n 字节开始的 m 长度是核心数据,有了这样的约定后,我们就能解码出想要的数据,执行业务逻辑,这样我们就能畅通无阻的通信了。
网络协议的设计
概要划分
一个最基本的网络协议必须包含
- 数据的长度
- 数据
了解 tcp
协议的同学一定听说过粘包、拆包
这两个术语。因为tcp
协议是数据流协议,它的底层根据二进制缓冲区的实际情况进行包的划分。所以,不可避免的会出现粘包,拆包
现象 。为了解决它们,我们的网络协议往往会使用一个 4 字节的 int
类型来表示数据的大小。比如,netty
就为我们提供了 lengthfieldbasedframedecoder
解码器,它可以有效的使用自定义长度帧来解决上述问题。
同时一个好的网络协议,还会将动作和业务数据分离。试想一下, http
协议的分为请求头,请求体——
- 请求头:定义了接口地址、
http method
、http
版本 - 请求体:定义了需要传递的数据
这就是一种分离关注点的思想。所以自定义的网络协议也可以包含:
- 动作指令:比如定义
code
来分门别类的代表不同的业务逻辑 - 序列化算法:描述了
java
对象和二进制之间转换的形式,提供多种序列化/反序列化方式。比如json
、protobuf
等等,甚至是自定义算法。比如:rocketmq
等等。
同时,协议的开头可以定义一个约定的魔数
。这个固定值(4字节),一般用来判断当前的数据包是否合法。比如,当我们使用 telnet
发送错误的数据包时,很显然,它不合法,会导致解码失败。所以,为了减轻服务器的压力,我们可以取出数据包的前4
个字节与固定的魔数
对比,如果是非法的格式,直接关闭连接,不继续解码。
网络协议结构如下所示:
+--------------+-----------+------------+-----------+----------+ | 魔数(4) | code(1) |序列化算法(1) |数据长度(4) |数据(n) | +--------------+-----------+------------+-----------+----------+
rocketmq 通信网络协议的实现
rocketmq 网络协议
这一小节,我们从rocketmq
中,分析优秀通信网络协议的实现。rocketmq
项目中,客户端和服务端的通信是基于 netty 之上构建的。同时,为了更加有效的通信,往往需要对发送的消息自定义网络协议。
rocketmq
的网络协议,从数据分类的角度上看,可分为两大类
- 消息头数据(header data)
- 消息体数据(body data)
从左到右
-
第一段:4 个字节整数,等于2、3、4 长度总和
-
第二段:4 个字节整数,等于3 的长度。特别的
byte[0]
代表序列化算法,byte[1~3]
才是真正的长度 -
第三段:代表消息头数据,结构如下
{ "code":0, "language":"java", "version":0, "opaque":0, "flag":1, "remark":"hello, i am respponse /127.0.0.1:27603", "extfields":{ "count":"0", "messagetitle":"hellomessagetitle" } }
- 第四段:代表消息体数据
rocketmq 消息头协议详细如下:
header 字段名 | 类型 | request | response |
---|---|---|---|
code | 整数 | 请求操作代码,请求接收方根据不同的代码做不同的操作 | 应答结果代码,0表示成功,非0表示各种错误代码 |
language | 字符串 | 请求发起方实现语言,默认java | 应答接收方实现语言 |
version | 整数 | 请求发起方程序版本 | 应答接收方程序版本 |
opaque | 整数 | 请求发起方在同一连接上不同的请求标识代码,多线程连接复用使用 | 应答方不做修改,直接返回 |
flag | 整数 | 通信层的标志位 | 通信层的标志位 |
remark | 字符串 | 传输自定义文本信息 | 错误详细描述信息 |
extfields | hashmap<string,string> | 请求自定义字段 | 应答自定义字段 |
编码过程
rocketmq
的通信模块是基于 netty
的。通过定义 nettyencoder
来实现对每一个 channel
的 出栈数据进行编码,如下所示:
@channelhandler.sharable public class nettyencoder extends messagetobyteencoder<remotingcommand> { @override public void encode(channelhandlercontext ctx, remotingcommand remotingcommand, bytebuf out) throws exception { try { bytebuffer header = remotingcommand.encodeheader(); out.writebytes(header); byte[] body = remotingcommand.getbody(); if (body != null) { out.writebytes(body); } } catch (exception e) { ... } } }
其中,核心的编码过程位于 remotingcommand
对象中,encodeheader
阶段,需要统计出消息总长度,即:
-
定义消息头长度,一个整数表示:占4个字节
-
定义消息头数据,并计算其长度
-
定义消息体数据,并计算其长度
-
额外再加 4是因为需要加入消息总长度,一个整数表示:占4个字节
public bytebuffer encodeheader(final int bodylength) { // 1> 消息头长度,一个整数表示:占4个字节 int length = 4; // 2> 消息头数据 byte[] headerdata; headerdata = this.headerencode(); // 再加消息头数据长度 length += headerdata.length; // 3> 再加消息体数据长度 length += bodylength; // 4> 额外加 4是因为需要加入消息总长度,一个整数表示:占4个字节 bytebuffer result = bytebuffer.allocate(4 + length - bodylength); // 5> 将消息总长度加入 bytebuffer result.putint(length); // 6> 将消息的头长度加入 bytebuffer result.put(markprotocoltype(headerdata.length, serializetypecurrentrpc)); // 7> 将消息头数据加入 bytebuffer result.put(headerdata); result.flip(); return result; }
其中,encode
阶段会将 commandcustomheader
数据转换 hashmap<string,string>
,方便序列化
public void makecustomheadertonet() { if (this.customheader != null) { field[] fields = getclazzfields(customheader.getclass()); if (null == this.extfields) { this.extfields = new hashmap<string, string>(); } for (field field : fields) { if (!modifier.isstatic(field.getmodifiers())) { string name = field.getname(); if (!name.startswith("this")) { object value = null; try { field.setaccessible(true); value = field.get(this.customheader); } catch (exception e) { log.error("failed to access field [{}]", name, e); } if (value != null) { this.extfields.put(name, value.tostring()); } } } } } }
特别的,消息头序列化支持两种算法:
json
rocketmq
private byte[] headerencode() { this.makecustomheadertonet(); if (serializetype.rocketmq == serializetypecurrentrpc) { return rocketmqserializable.rocketmqprotocolencode(this); } else { return remotingserializable.encode(this); } }
这儿需要值得注意的是,encode
阶段将当前 rpc
类型和 headerdata
长度编码到一个 byte[4]
数组中,byte[0]
位序列化类型。
public static byte[] markprotocoltype(int source, serializetype type) { byte[] result = new byte[4]; result[0] = type.getcode(); result[1] = (byte) ((source >> 16) & 0xff); result[2] = (byte) ((source >> 8) & 0xff); result[3] = (byte) (source & 0xff); return result; }
其中,通过与运算 & 0xff
取低八位数据。
所以, 最终 length
长度等于序列化类型 + header length + header data + body data 的字节的长度。
解码过程
rocketmq
解码通过nettydecoder
来实现,它继承自 lengthfieldbasedframedecoder
,其中调用了父类lengthfieldbasedframedecoder
的构造函数
super(frame_max_length, 0, 4, 0, 4);
这些参数设置4
个字节代表 length
总长度,同时解码时跳过最开始的4
个字节:
frame = (bytebuf) super.decode(ctx, in);
所以,得到的 frame
= 序列化类型 + header length + header data + body data 。解码如下所示:
public static remotingcommand decode(final bytebuffer bytebuffer) { //总长度 int length = bytebuffer.limit(); //原始的 header length,4位 int oriheaderlen = bytebuffer.getint(); //真正的 header data 长度。忽略 byte[0]的 serializetype int headerlength = getheaderlength(oriheaderlen); byte[] headerdata = new byte[headerlength]; bytebuffer.get(headerdata); remotingcommand cmd = headerdecode(headerdata, getprotocoltype(oriheaderlen)); int bodylength = length - 4 - headerlength; byte[] bodydata = null; if (bodylength > 0) { bodydata = new byte[bodylength]; bytebuffer.get(bodydata); } cmd.body = bodydata; return cmd; } private static remotingcommand headerdecode(byte[] headerdata, serializetype type) { switch (type) { case json: remotingcommand resultjson = remotingserializable.decode(headerdata, remotingcommand.class); resultjson.setserializetypecurrentrpc(type); return resultjson; case rocketmq: remotingcommand resultrmq = rocketmqserializable.rocketmqprotocoldecode(headerdata); resultrmq.setserializetypecurrentrpc(type); return resultrmq; default: break; } return null; }
其中,getprotocoltype
,右移 24
位,拿到 serializetype
:
public static serializetype getprotocoltype(int source) { return serializetype.valueof((byte) ((source >> 24) & 0xff)); }
getheaderlength
拿到 0-24 位代表的 headerdata
length:
public static int getheaderlength(int length) { return length & 0xffffff; }
小结
对于诸多中间件而言,底层的网络通信模块往往会使用 netty
。netty
提供了诸多的编解码器,可以快速方便的上手。本文从如何设计一个网络协议入手,最终切入到 rocketmq
底层网络协议的实现。可以看到,它并不复杂。仔细研读几遍变能理解其奥义。具体参考类nettyencoder
、nettydecoder
、remotingcommand
。
上一篇: python第七周练习题