Dubbo源码分析十六、dubbo编解码
单独分析一下dubbo的编解码过程。默认情况下使用dubbo自己定义的编码协议。
我们先来看一下 Dubbo 数据包结构。
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Vh5EtRPv-1609227698562)(.\imgs\data-format.jpg)]
Dubbo 数据包分为消息头和消息体,消息头用于存储一些元信息,比如魔数(Magic),数据包类型(Request/Response),消息体长度(Data Length)等。消息体中用于存储具体的调用消息,比如方法名称,参数列表等。下面简单列举一下消息头的内容。
偏移量(Bit) | 字段 | 取值 |
---|---|---|
0 ~ 7 | 魔数高位 | 0xda00 |
8 ~ 15 | 魔数低位 | 0xbb |
16 | 数据包类型 | 0 - Response, 1 - Request |
17 | 调用方式 | 仅在第16位被设为1的情况下有效,0 - 单向调用,1 - 双向调用 |
18 | 事件标识 | 0 - 当前数据包是请求或响应包,1 - 当前数据包是心跳包 |
19 ~ 23 | 序列化器编号 | 2 - Hessian2Serialization 3 - JavaSerialization 4 - CompactedJavaSerialization 6 - FastJsonSerialization 7 - NativeJavaSerialization 8 - KryoSerialization 9 - FstSerialization |
24 ~ 31 | 状态 | 20 - OK 30 - CLIENT_TIMEOUT 31 - SERVER_TIMEOUT 40 - BAD_REQUEST 50 - BAD_RESPONSE …… |
32 ~ 95 | 请求编号 | 共8字节,运行时生成 |
96 ~ 127 | 消息体长度 | 运行时计算 |
编码
ExchangeCodec的encode方法:
@Override
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
if (msg instanceof Request) {
// 对 Request 对象进行编码
encodeRequest(channel, buffer, (Request) msg);
} else if (msg instanceof Response) {
// 对 Response 对象进行编码
encodeResponse(channel, buffer, (Response) msg);
} else {
super.encode(channel, buffer, msg);
}
}
这里有对request和response的编码过程。我们以request为例:
protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
Serialization serialization = getSerialization(channel);
// header.
// 创建消息头字节数组,长度为 16
byte[] header = new byte[HEADER_LENGTH];
// set magic number.
// 设置魔数
Bytes.short2bytes(MAGIC, header);
// set request and serialization flag.
// 设置数据包类型(Request/Response)和序列化器编号
header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());
if (req.isTwoWay()) {
// 设置通信方式(单向/双向)
header[2] |= FLAG_TWOWAY;
}
if (req.isEvent()) {
// 设置事件标识
header[2] |= FLAG_EVENT;
}
// set request id.
// 设置请求编号,8个字节,从第4个字节开始设置
Bytes.long2bytes(req.getId(), header, 4);
// encode request data.
// 获取 buffer 当前的写位置
int savedWriteIndex = buffer.writerIndex();
// 更新 writerIndex,为消息头预留 16 个字节的空间
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
// 创建序列化器,比如 Hessian2ObjectOutput
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
if (req.isEvent()) {
// 对事件数据进行序列化操作
encodeEventData(channel, out, req.getData());
} else {
// 对请求数据进行序列化操作
encodeRequestData(channel, out, req.getData(), req.getVersion());
}
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}
bos.flush();
bos.close();
// 获取写入的字节数,也就是消息体长度
int len = bos.writtenBytes();
checkPayload(channel, len);
// 将消息体长度写入到消息头中
Bytes.int2bytes(len, header, 12);
// write
// 将 buffer 指针移动到 savedWriteIndex,为写消息头做准备
buffer.writerIndex(savedWriteIndex);
// 从 savedWriteIndex 下标处写入消息头
buffer.writeBytes(header); // write header.
// 设置新的 writerIndex,writerIndex = 原写下标 + 消息头长度 + 消息体长度
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}
protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException {
int savedWriteIndex = buffer.writerIndex();
try {
Serialization serialization = getSerialization(channel);
// header.
// 创建消息头字节数组
byte[] header = new byte[HEADER_LENGTH];
// set magic number.
// 设置魔数
Bytes.short2bytes(MAGIC, header);
// set request and serialization flag.
// 设置序列化器编号
header[2] = serialization.getContentTypeId();
if (res.isHeartbeat()) {
header[2] |= FLAG_EVENT;
}
// set response status.
// 获取响应状态
byte status = res.getStatus();
// 设置响应状态
header[3] = status;
// set request id.
// 设置请求编号
Bytes.long2bytes(res.getId(), header, 4);
// 更新 writerIndex,为消息头预留 16 个字节的空间
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
// encode response data or error message.
if (status == Response.OK) {
if (res.isHeartbeat()) {
// 对心跳响应结果进行序列化,已废弃
encodeEventData(channel, out, res.getResult());
} else {
// 对调用结果进行序列化
encodeResponseData(channel, out, res.getResult(), res.getVersion());
}
} else {
// 对错误信息进行序列化
out.writeUTF(res.getErrorMessage());
}
out.flushBuffer();
if (out instanceof Cleanable) {
((Cleanable) out).cleanup();
}
bos.flush();
bos.close();
// 获取写入的字节数,也就是消息体长度
int len = bos.writtenBytes();
checkPayload(channel, len);
// 将消息体长度写入到消息头中
Bytes.int2bytes(len, header, 12);
// write
// 将 buffer 指针移动到 savedWriteIndex,为写消息头做准备
buffer.writerIndex(savedWriteIndex);
// 从 savedWriteIndex 下标处写入消息头
buffer.writeBytes(header); // write header.
// 设置新的 writerIndex,writerIndex = 原写下标 + 消息头长度 + 消息体长度
buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
} catch (Throwable t) {
// clear buffer
buffer.writerIndex(savedWriteIndex);
// send error message to Consumer, otherwise, Consumer will wait till timeout.
if (!res.isEvent() && res.getStatus() != Response.BAD_RESPONSE) {
Response r = new Response(res.getId(), res.getVersion());
r.setStatus(Response.BAD_RESPONSE);
if (t instanceof ExceedPayloadLimitException) {
logger.warn(t.getMessage(), t);
try {
r.setErrorMessage(t.getMessage());
channel.send(r);
return;
} catch (RemotingException e) {
logger.warn("Failed to send bad_response info back: " + t.getMessage() + ", cause: " + e.getMessage(), e);
}
} else {
// FIXME log error message in Codec and handle in caught() of IoHanndler?
logger.warn("Fail to encode response: " + res + ", send bad_response info instead, cause: " + t.getMessage(), t);
try {
r.setErrorMessage("Failed to send response: " + res + ", cause: " + StringUtils.toString(t));
channel.send(r);
return;
} catch (RemotingException e) {
logger.warn("Failed to send bad_response info back: " + res + ", cause: " + e.getMessage(), e);
}
}
}
// Rethrow exception
if (t instanceof IOException) {
throw (IOException) t;
} else if (t instanceof RuntimeException) {
throw (RuntimeException) t;
} else if (t instanceof Error) {
throw (Error) t;
} else {
throw new RuntimeException(t.getMessage(), t);
}
}
}
按照编码协议写消息头和消息体。这里看一下encodeRequestData方法,由子类DubboCodec实现,主要是对消息体的序列化实现:
@Override
protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
RpcInvocation inv = (RpcInvocation) data;
// 依次序列化 dubbo version、serviceName、version
out.writeUTF(version);
// https://github.com/apache/dubbo/issues/6138
String serviceName = inv.getAttachment(INTERFACE_KEY);
if (serviceName == null) {
serviceName = inv.getAttachment(PATH_KEY);
}
out.writeUTF(serviceName);
out.writeUTF(inv.getAttachment(VERSION_KEY));
// 序列化调用方法名
out.writeUTF(inv.getMethodName());
// 将参数类型转换为字符串,并进行序列化
out.writeUTF(inv.getParameterTypesDesc());
Object[] args = inv.getArguments();
if (args != null) {
for (int i = 0; i < args.length; i++) {
// 对运行时参数进行序列化
out.writeObject(encodeInvocationArgument(channel, inv, i));
}
}
// 序列化 attachments
out.writeAttachments(inv.getObjectAttachments());
}
序列化默认使用Hessian,这里的out是ObjectOutput的实现,默认是Hessian2ObjectOutput:
public class Hessian2ObjectOutput implements ObjectOutput {
private final Hessian2Output output;
public Hessian2ObjectOutput(OutputStream os) {
output = new Hessian2Output(os);
output.setSerializerFactory(Hessian2SerializerFactory.INSTANCE);
}
@Override
public void writeBool(boolean v) throws IOException {
output.writeBoolean(v);
}
@Override
public void writeByte(byte v) throws IOException {
output.writeInt(v);
}
@Override
public void writeShort(short v) throws IOException {
output.writeInt(v);
}
@Override
public void writeInt(int v) throws IOException {
output.writeInt(v);
}
@Override
public void writeLong(long v) throws IOException {
output.writeLong(v);
}
@Override
public void writeFloat(float v) throws IOException {
output.writeDouble(v);
}
@Override
public void writeDouble(double v) throws IOException {
output.writeDouble(v);
}
@Override
public void writeBytes(byte[] b) throws IOException {
output.writeBytes(b);
}
@Override
public void writeBytes(byte[] b, int off, int len) throws IOException {
output.writeBytes(b, off, len);
}
@Override
public void writeUTF(String v) throws IOException {
output.writeString(v);
}
@Override
public void writeObject(Object obj) throws IOException {
output.writeObject(obj);
}
@Override
public void flushBuffer() throws IOException {
output.flushBuffer();
}
}
各种write和flush操作,执行的是Hessian2Output的。
解码
回来看ExchangeCodec的decode方法:
@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
int readable = buffer.readableBytes();
// 创建消息头字节数组
byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
// 读取消息头数据
buffer.readBytes(header);
// 调用重载方法进行后续解码工作
return decode(channel, buffer, readable, header);
}
@Override
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
// check magic number.
// 检查魔数是否相等
if (readable > 0 && header[0] != MAGIC_HIGH
|| readable > 1 && header[1] != MAGIC_LOW) {
int length = header.length;
if (header.length < readable) {
header = Bytes.copyOf(header, readable);
buffer.readBytes(header, length, readable - length);
}
for (int i = 1; i < header.length - 1; i++) {
if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
buffer.readerIndex(buffer.readerIndex() - header.length + i);
header = Bytes.copyOf(header, i);
break;
}
}
// 通过 telnet 命令行发送的数据包不包含消息头,所以这里
// 调用 TelnetCodec 的 decode 方法对数据包进行解码
return super.decode(channel, buffer, readable, header);
}
// check length.
// 检测可读数据量是否少于消息头长度,若小于则立即返回 DecodeResult.NEED_MORE_INPUT
if (readable < HEADER_LENGTH) {
return DecodeResult.NEED_MORE_INPUT;
}
// get data length.
// 从消息头中获取消息体长度
int len = Bytes.bytes2int(header, 12);
// 检测消息体长度是否超出限制,超出则抛出异常
checkPayload(channel, len);
int tt = len + HEADER_LENGTH;
// 检测可读的字节数是否小于实际的字节数
if (readable < tt) {
return DecodeResult.NEED_MORE_INPUT;
}
// limit input stream.
ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);
try {
// 继续进行解码工作
return decodeBody(channel, is, header);
} finally {
if (is.available() > 0) {
try {
if (logger.isWarnEnabled()) {
logger.warn("Skip input stream " + is.available());
}
StreamUtils.skipUnusedStream(is);
} catch (IOException e) {
logger.warn(e.getMessage(), e);
}
}
}
}
先按协议解码消息头,然后是消息体的解码:
@Override
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
// 获取消息头中的第三个字节,并通过逻辑与运算得到序列化器编号
byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
// get request id.
// 获取调用编号
long id = Bytes.bytes2long(header, 4);
// 通过逻辑与运算得到调用类型,0 - Response,1 - Request
if ((flag & FLAG_REQUEST) == 0) {
// 对响应结果进行解码,得到 Response 对象。
// decode response.
Response res = new Response(id);
if ((flag & FLAG_EVENT) != 0) {
res.setEvent(true);
}
// get status.
byte status = header[3];
res.setStatus(status);
try {
if (status == Response.OK) {
Object data;
if (res.isEvent()) {
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
data = decodeEventData(channel, in);
} else {
DecodeableRpcResult result;
if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
result = new DecodeableRpcResult(channel, res, is,
(Invocation) getRequestData(id), proto);
result.decode();
} else {
result = new DecodeableRpcResult(channel, res,
new UnsafeByteArrayInputStream(readMessageData(is)),
(Invocation) getRequestData(id), proto);
}
data = result;
}
res.setResult(data);
} else {
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
res.setErrorMessage(in.readUTF());
}
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn("Decode response failed: " + t.getMessage(), t);
}
res.setStatus(Response.CLIENT_ERROR);
res.setErrorMessage(StringUtils.toString(t));
}
return res;
} else {
// decode request.
// 创建 Request 对象
Request req = new Request(id);
req.setVersion(Version.getProtocolVersion());
// 通过逻辑与运算得到通信方式,并设置到 Request 对象中
req.setTwoWay((flag & FLAG_TWOWAY) != 0);
// 通过位运算检测数据包是否为事件类型
if ((flag & FLAG_EVENT) != 0) {
req.setEvent(true);
}
try {
Object data;
if (req.isEvent()) {
ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
// 对事件包进行解码
data = decodeEventData(channel, in);
} else {
DecodeableRpcInvocation inv;
// 根据 url 参数判断是否在 IO 线程上对消息体进行解码
if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
inv = new DecodeableRpcInvocation(channel, req, is, proto);
// 在当前线程,也就是 IO 线程上进行后续的解码工作。此工作完成后,可将
// 调用方法名、attachment、以及调用参数解析出来
inv.decode();
} else {
// 仅创建 DecodeableRpcInvocation 对象,但不在当前线程上执行解码逻辑
inv = new DecodeableRpcInvocation(channel, req,
new UnsafeByteArrayInputStream(readMessageData(is)), proto);
}
data = inv;
}
// 设置 data 到 Request 对象中
req.setData(data);
} catch (Throwable t) {
if (log.isWarnEnabled()) {
log.warn("Decode request failed: " + t.getMessage(), t);
}
// bad request
// 若解码过程中出现异常,则将 broken 字段设为 true,
// 并将异常对象设置到 Reqeust 对象中
req.setBroken(true);
req.setData(t);
}
return req;
}
}
得到一个Request对象。我们看一下DecodeableRpcInvocation的decode方法:
@Override
public void decode() throws Exception {
if (!hasDecoded && channel != null && inputStream != null) {
try {
decode(channel, inputStream);
} catch (Throwable e) {
if (log.isWarnEnabled()) {
log.warn("Decode rpc invocation failed: " + e.getMessage(), e);
}
request.setBroken(true);
request.setData(e);
} finally {
hasDecoded = true;
}
}
}
@Override
public Object decode(Channel channel, InputStream input) throws IOException {
ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
.deserialize(channel.getUrl(), input);
// 通过反序列化得到 dubbo version,并保存到 attachments 变量中
String dubboVersion = in.readUTF();
request.setVersion(dubboVersion);
setAttachment(DUBBO_VERSION_KEY, dubboVersion);
// 通过反序列化得到 path,version,并保存到 attachments 变量中
String path = in.readUTF();
setAttachment(PATH_KEY, path);
setAttachment(VERSION_KEY, in.readUTF());
// 通过反序列化得到调用方法名
setMethodName(in.readUTF());
// 通过反序列化得到参数类型字符串,比如 Ljava/lang/String;
String desc = in.readUTF();
setParameterTypesDesc(desc);
try {
Object[] args = DubboCodec.EMPTY_OBJECT_ARRAY;
Class<?>[] pts = DubboCodec.EMPTY_CLASS_ARRAY;
if (desc.length() > 0) {
// if (RpcUtils.isGenericCall(path, getMethodName()) || RpcUtils.isEcho(path, getMethodName())) {
// pts = ReflectUtils.desc2classArray(desc);
// } else {
ServiceRepository repository = ApplicationModel.getServiceRepository();
ServiceDescriptor serviceDescriptor = repository.lookupService(path);
if (serviceDescriptor != null) {
MethodDescriptor methodDescriptor = serviceDescriptor.getMethod(getMethodName(), desc);
if (methodDescriptor != null) {
pts = methodDescriptor.getParameterClasses();
this.setReturnTypes(methodDescriptor.getReturnTypes());
}
}
if (pts == DubboCodec.EMPTY_CLASS_ARRAY) {
if (!RpcUtils.isGenericCall(desc, getMethodName()) && !RpcUtils.isEcho(desc, getMethodName())) {
throw new IllegalArgumentException("Service not found:" + path + ", " + getMethodName());
}
// 将 desc 解析为参数类型数组
pts = ReflectUtils.desc2classArray(desc);
}
// }
args = new Object[pts.length];
for (int i = 0; i < args.length; i++) {
try {
// 解析运行时参数
args[i] = in.readObject(pts[i]);
} catch (Exception e) {
if (log.isWarnEnabled()) {
log.warn("Decode argument failed: " + e.getMessage(), e);
}
}
}
}
// 设置参数类型数组
setParameterTypes(pts);
// 通过反序列化得到原 attachment 的内容
Map<String, Object> map = in.readAttachments();
if (map != null && map.size() > 0) {
Map<String, Object> attachment = getObjectAttachments();
if (attachment == null) {
attachment = new HashMap<>();
}
// 将 map 与当前对象中的 attachment 集合进行融合
attachment.putAll(map);
setObjectAttachments(attachment);
}
//decode argument ,may be callback
// 对 callback 类型的参数进行处理
for (int i = 0; i < args.length; i++) {
args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]);
}
// 设置参数列表
setArguments(args);
String targetServiceName = buildKey((String) getAttachment(PATH_KEY),
getAttachment(GROUP_KEY),
getAttachment(VERSION_KEY));
setTargetServiceUniqueName(targetServiceName);
} catch (ClassNotFoundException e) {
throw new IOException(StringUtils.toString("Read invocation data failed.", e));
} finally {
if (in instanceof Cleanable) {
((Cleanable) in).cleanup();
}
}
return this;
}
反序列化操作,这里的in默认的是hessian的实现Hessian2ObjectInput:
public class Hessian2ObjectInput implements ObjectInput {
private final Hessian2Input input;
public Hessian2ObjectInput(InputStream is) {
input = new Hessian2Input(is);
input.setSerializerFactory(Hessian2SerializerFactory.INSTANCE);
}
@Override
public boolean readBool() throws IOException {
return input.readBoolean();
}
@Override
public byte readByte() throws IOException {
return (byte) input.readInt();
}
@Override
public short readShort() throws IOException {
return (short) input.readInt();
}
@Override
public int readInt() throws IOException {
return input.readInt();
}
@Override
public long readLong() throws IOException {
return input.readLong();
}
@Override
public float readFloat() throws IOException {
return (float) input.readDouble();
}
@Override
public double readDouble() throws IOException {
return input.readDouble();
}
@Override
public byte[] readBytes() throws IOException {
return input.readBytes();
}
@Override
public String readUTF() throws IOException {
return input.readString();
}
@Override
public Object readObject() throws IOException {
return input.readObject();
}
@Override
@SuppressWarnings("unchecked")
public <T> T readObject(Class<T> cls) throws IOException {
return (T) input.readObject(cls);
}
@Override
public <T> T readObject(Class<T> cls, Type type) throws IOException {
return readObject(cls);
}
}
read过程就是Hessian2Input的对应方法。
至此编解码过程基本分析完了。关于响应消息的编解码和请求消息的差不多。有兴趣的可以自己看看。
Dubbo相关的内容至此也基本上结束了。实际上Dubbo官网上讲的非常详细,对照着源码看都很容易理解。当然还有围绕dubbo周边的一些东西,比如dubbo结合配置中心的动态配置如何实现的?spring-cloud-starter-dubbo在cloud环境下如何引入的? 这些问题在分析相关代码也不是很难。以后有机会再补充吧。
本文地址:https://blog.csdn.net/qq_19414183/article/details/111921711