欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  网络运营

Dubbo源码分析十六、dubbo编解码

程序员文章站 2022-03-01 21:53:45
单独分析一下dubbo的编解码过程。默认情况下使用dubbo自己定义的编码协议。我们先来看一下 Dubbo 数据包结构。[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Vh5EtRPv-1609227698562)(.\imgs\data-format.jpg)]Dubbo 数据包分为消息头和消息体,消息头用于存储一些元信息,比如魔数(Magic),数据包类型(Request/Response),消息体长度(Data Length)等。消息体中用于存储具体的调用消息,比如...

单独分析一下dubbo的编解码过程。默认情况下使用dubbo自己定义的编码协议。

我们先来看一下 Dubbo 数据包结构。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Vh5EtRPv-1609227698562)(.\imgs\data-format.jpg)]

Dubbo 数据包分为消息头和消息体,消息头用于存储一些元信息,比如魔数(Magic),数据包类型(Request/Response),消息体长度(Data Length)等。消息体中用于存储具体的调用消息,比如方法名称,参数列表等。下面简单列举一下消息头的内容。

偏移量(Bit) 字段 取值
0 ~ 7 魔数高位 0xda00
8 ~ 15 魔数低位 0xbb
16 数据包类型 0 - Response, 1 - Request
17 调用方式 仅在第16位被设为1的情况下有效,0 - 单向调用,1 - 双向调用
18 事件标识 0 - 当前数据包是请求或响应包,1 - 当前数据包是心跳包
19 ~ 23 序列化器编号 2 - Hessian2Serialization 3 - JavaSerialization 4 - CompactedJavaSerialization 6 - FastJsonSerialization 7 - NativeJavaSerialization 8 - KryoSerialization 9 - FstSerialization
24 ~ 31 状态 20 - OK 30 - CLIENT_TIMEOUT 31 - SERVER_TIMEOUT 40 - BAD_REQUEST 50 - BAD_RESPONSE ……
32 ~ 95 请求编号 共8字节,运行时生成
96 ~ 127 消息体长度 运行时计算

编码

ExchangeCodec的encode方法:

@Override
public void encode(Channel channel, ChannelBuffer buffer, Object msg) throws IOException {
    if (msg instanceof Request) {
        // 对 Request 对象进行编码
        encodeRequest(channel, buffer, (Request) msg);
    } else if (msg instanceof Response) {
        // 对 Response 对象进行编码
        encodeResponse(channel, buffer, (Response) msg);
    } else {
        super.encode(channel, buffer, msg);
    }
}

这里有对request和response的编码过程。我们以request为例:

protected void encodeRequest(Channel channel, ChannelBuffer buffer, Request req) throws IOException {
    Serialization serialization = getSerialization(channel);
    // header.
    // 创建消息头字节数组,长度为 16
    byte[] header = new byte[HEADER_LENGTH];
    // set magic number.
    // 设置魔数
    Bytes.short2bytes(MAGIC, header);

    // set request and serialization flag.
    // 设置数据包类型(Request/Response)和序列化器编号
    header[2] = (byte) (FLAG_REQUEST | serialization.getContentTypeId());

    if (req.isTwoWay()) {
        // 设置通信方式(单向/双向)
        header[2] |= FLAG_TWOWAY;
    }
    if (req.isEvent()) {
        // 设置事件标识
        header[2] |= FLAG_EVENT;
    }

    // set request id.
    // 设置请求编号,8个字节,从第4个字节开始设置
    Bytes.long2bytes(req.getId(), header, 4);

    // encode request data.
    // 获取 buffer 当前的写位置
    int savedWriteIndex = buffer.writerIndex();
    // 更新 writerIndex,为消息头预留 16 个字节的空间
    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
    ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
    // 创建序列化器,比如 Hessian2ObjectOutput
    ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
    if (req.isEvent()) {
        // 对事件数据进行序列化操作
        encodeEventData(channel, out, req.getData());
    } else {
        // 对请求数据进行序列化操作
        encodeRequestData(channel, out, req.getData(), req.getVersion());
    }
    out.flushBuffer();
    if (out instanceof Cleanable) {
        ((Cleanable) out).cleanup();
    }
    bos.flush();
    bos.close();
    // 获取写入的字节数,也就是消息体长度
    int len = bos.writtenBytes();
    checkPayload(channel, len);
    // 将消息体长度写入到消息头中
    Bytes.int2bytes(len, header, 12);

    // write
    // 将 buffer 指针移动到 savedWriteIndex,为写消息头做准备
    buffer.writerIndex(savedWriteIndex);
    // 从 savedWriteIndex 下标处写入消息头
    buffer.writeBytes(header); // write header.
    // 设置新的 writerIndex,writerIndex = 原写下标 + 消息头长度 + 消息体长度
    buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
}

protected void encodeResponse(Channel channel, ChannelBuffer buffer, Response res) throws IOException {
    int savedWriteIndex = buffer.writerIndex();
    try {
        Serialization serialization = getSerialization(channel);
        // header.
        // 创建消息头字节数组
        byte[] header = new byte[HEADER_LENGTH];
        // set magic number.
        // 设置魔数
        Bytes.short2bytes(MAGIC, header);
        // set request and serialization flag.
        // 设置序列化器编号
        header[2] = serialization.getContentTypeId();
        if (res.isHeartbeat()) {
            header[2] |= FLAG_EVENT;
        }
        // set response status.
        // 获取响应状态
        byte status = res.getStatus();
        // 设置响应状态
        header[3] = status;
        // set request id.
        // 设置请求编号
        Bytes.long2bytes(res.getId(), header, 4);

        // 更新 writerIndex,为消息头预留 16 个字节的空间
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH);
        ChannelBufferOutputStream bos = new ChannelBufferOutputStream(buffer);
        ObjectOutput out = serialization.serialize(channel.getUrl(), bos);
        // encode response data or error message.
        if (status == Response.OK) {
            if (res.isHeartbeat()) {
                // 对心跳响应结果进行序列化,已废弃
                encodeEventData(channel, out, res.getResult());
            } else {
                // 对调用结果进行序列化
                encodeResponseData(channel, out, res.getResult(), res.getVersion());
            }
        } else {
            // 对错误信息进行序列化
            out.writeUTF(res.getErrorMessage());
        }
        out.flushBuffer();
        if (out instanceof Cleanable) {
            ((Cleanable) out).cleanup();
        }
        bos.flush();
        bos.close();

        // 获取写入的字节数,也就是消息体长度
        int len = bos.writtenBytes();
        checkPayload(channel, len);
        // 将消息体长度写入到消息头中
        Bytes.int2bytes(len, header, 12);
        // write
        // 将 buffer 指针移动到 savedWriteIndex,为写消息头做准备
        buffer.writerIndex(savedWriteIndex);
        // 从 savedWriteIndex 下标处写入消息头
        buffer.writeBytes(header); // write header.
        // 设置新的 writerIndex,writerIndex = 原写下标 + 消息头长度 + 消息体长度
        buffer.writerIndex(savedWriteIndex + HEADER_LENGTH + len);
    } catch (Throwable t) {
        // clear buffer
        buffer.writerIndex(savedWriteIndex);
        // send error message to Consumer, otherwise, Consumer will wait till timeout.
        if (!res.isEvent() && res.getStatus() != Response.BAD_RESPONSE) {
            Response r = new Response(res.getId(), res.getVersion());
            r.setStatus(Response.BAD_RESPONSE);

            if (t instanceof ExceedPayloadLimitException) {
                logger.warn(t.getMessage(), t);
                try {
                    r.setErrorMessage(t.getMessage());
                    channel.send(r);
                    return;
                } catch (RemotingException e) {
                    logger.warn("Failed to send bad_response info back: " + t.getMessage() + ", cause: " + e.getMessage(), e);
                }
            } else {
                // FIXME log error message in Codec and handle in caught() of IoHanndler?
                logger.warn("Fail to encode response: " + res + ", send bad_response info instead, cause: " + t.getMessage(), t);
                try {
                    r.setErrorMessage("Failed to send response: " + res + ", cause: " + StringUtils.toString(t));
                    channel.send(r);
                    return;
                } catch (RemotingException e) {
                    logger.warn("Failed to send bad_response info back: " + res + ", cause: " + e.getMessage(), e);
                }
            }
        }

        // Rethrow exception
        if (t instanceof IOException) {
            throw (IOException) t;
        } else if (t instanceof RuntimeException) {
            throw (RuntimeException) t;
        } else if (t instanceof Error) {
            throw (Error) t;
        } else {
            throw new RuntimeException(t.getMessage(), t);
        }
    }
}

按照编码协议写消息头和消息体。这里看一下encodeRequestData方法,由子类DubboCodec实现,主要是对消息体的序列化实现:

@Override
protected void encodeRequestData(Channel channel, ObjectOutput out, Object data, String version) throws IOException {
    RpcInvocation inv = (RpcInvocation) data;

    // 依次序列化 dubbo version、serviceName、version
    out.writeUTF(version);
    // https://github.com/apache/dubbo/issues/6138
    String serviceName = inv.getAttachment(INTERFACE_KEY);
    if (serviceName == null) {
        serviceName = inv.getAttachment(PATH_KEY);
    }
    out.writeUTF(serviceName);
    out.writeUTF(inv.getAttachment(VERSION_KEY));

    // 序列化调用方法名
    out.writeUTF(inv.getMethodName());
    // 将参数类型转换为字符串,并进行序列化
    out.writeUTF(inv.getParameterTypesDesc());
    Object[] args = inv.getArguments();
    if (args != null) {
        for (int i = 0; i < args.length; i++) {
            // 对运行时参数进行序列化
            out.writeObject(encodeInvocationArgument(channel, inv, i));
        }
    }
    // 序列化 attachments
    out.writeAttachments(inv.getObjectAttachments());
}

序列化默认使用Hessian,这里的out是ObjectOutput的实现,默认是Hessian2ObjectOutput:

public class Hessian2ObjectOutput implements ObjectOutput {
    private final Hessian2Output output;

    public Hessian2ObjectOutput(OutputStream os) {
        output = new Hessian2Output(os);
        output.setSerializerFactory(Hessian2SerializerFactory.INSTANCE);
    }

    @Override
    public void writeBool(boolean v) throws IOException {
        output.writeBoolean(v);
    }

    @Override
    public void writeByte(byte v) throws IOException {
        output.writeInt(v);
    }

    @Override
    public void writeShort(short v) throws IOException {
        output.writeInt(v);
    }

    @Override
    public void writeInt(int v) throws IOException {
        output.writeInt(v);
    }

    @Override
    public void writeLong(long v) throws IOException {
        output.writeLong(v);
    }

    @Override
    public void writeFloat(float v) throws IOException {
        output.writeDouble(v);
    }

    @Override
    public void writeDouble(double v) throws IOException {
        output.writeDouble(v);
    }

    @Override
    public void writeBytes(byte[] b) throws IOException {
        output.writeBytes(b);
    }

    @Override
    public void writeBytes(byte[] b, int off, int len) throws IOException {
        output.writeBytes(b, off, len);
    }

    @Override
    public void writeUTF(String v) throws IOException {
        output.writeString(v);
    }

    @Override
    public void writeObject(Object obj) throws IOException {
        output.writeObject(obj);
    }

    @Override
    public void flushBuffer() throws IOException {
        output.flushBuffer();
    }
}

各种write和flush操作,执行的是Hessian2Output的。

解码

回来看ExchangeCodec的decode方法:

@Override
public Object decode(Channel channel, ChannelBuffer buffer) throws IOException {
    int readable = buffer.readableBytes();
    // 创建消息头字节数组
    byte[] header = new byte[Math.min(readable, HEADER_LENGTH)];
    // 读取消息头数据
    buffer.readBytes(header);
    // 调用重载方法进行后续解码工作
    return decode(channel, buffer, readable, header);
}

@Override
protected Object decode(Channel channel, ChannelBuffer buffer, int readable, byte[] header) throws IOException {
    // check magic number.
    // 检查魔数是否相等
    if (readable > 0 && header[0] != MAGIC_HIGH
            || readable > 1 && header[1] != MAGIC_LOW) {
        int length = header.length;
        if (header.length < readable) {
            header = Bytes.copyOf(header, readable);
            buffer.readBytes(header, length, readable - length);
        }
        for (int i = 1; i < header.length - 1; i++) {
            if (header[i] == MAGIC_HIGH && header[i + 1] == MAGIC_LOW) {
                buffer.readerIndex(buffer.readerIndex() - header.length + i);
                header = Bytes.copyOf(header, i);
                break;
            }
        }
        // 通过 telnet 命令行发送的数据包不包含消息头,所以这里
        // 调用 TelnetCodec 的 decode 方法对数据包进行解码
        return super.decode(channel, buffer, readable, header);
    }
    // check length.
    // 检测可读数据量是否少于消息头长度,若小于则立即返回 DecodeResult.NEED_MORE_INPUT
    if (readable < HEADER_LENGTH) {
        return DecodeResult.NEED_MORE_INPUT;
    }

    // get data length.
    // 从消息头中获取消息体长度
    int len = Bytes.bytes2int(header, 12);
    // 检测消息体长度是否超出限制,超出则抛出异常
    checkPayload(channel, len);

    int tt = len + HEADER_LENGTH;
    // 检测可读的字节数是否小于实际的字节数
    if (readable < tt) {
        return DecodeResult.NEED_MORE_INPUT;
    }

    // limit input stream.
    ChannelBufferInputStream is = new ChannelBufferInputStream(buffer, len);

    try {
        // 继续进行解码工作
        return decodeBody(channel, is, header);
    } finally {
        if (is.available() > 0) {
            try {
                if (logger.isWarnEnabled()) {
                    logger.warn("Skip input stream " + is.available());
                }
                StreamUtils.skipUnusedStream(is);
            } catch (IOException e) {
                logger.warn(e.getMessage(), e);
            }
        }
    }
}

先按协议解码消息头,然后是消息体的解码:

@Override
protected Object decodeBody(Channel channel, InputStream is, byte[] header) throws IOException {
    // 获取消息头中的第三个字节,并通过逻辑与运算得到序列化器编号
    byte flag = header[2], proto = (byte) (flag & SERIALIZATION_MASK);
    // get request id.
    // 获取调用编号
    long id = Bytes.bytes2long(header, 4);
    // 通过逻辑与运算得到调用类型,0 - Response,1 - Request
    if ((flag & FLAG_REQUEST) == 0) {
        // 对响应结果进行解码,得到 Response 对象。
        // decode response.
        Response res = new Response(id);
        if ((flag & FLAG_EVENT) != 0) {
            res.setEvent(true);
        }
        // get status.
        byte status = header[3];
        res.setStatus(status);
        try {
            if (status == Response.OK) {
                Object data;
                if (res.isEvent()) {
                    ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
                    data = decodeEventData(channel, in);
                } else {
                    DecodeableRpcResult result;
                    if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
                        result = new DecodeableRpcResult(channel, res, is,
                                (Invocation) getRequestData(id), proto);
                        result.decode();
                    } else {
                        result = new DecodeableRpcResult(channel, res,
                                new UnsafeByteArrayInputStream(readMessageData(is)),
                                (Invocation) getRequestData(id), proto);
                    }
                    data = result;
                }
                res.setResult(data);
            } else {
                ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
                res.setErrorMessage(in.readUTF());
            }
        } catch (Throwable t) {
            if (log.isWarnEnabled()) {
                log.warn("Decode response failed: " + t.getMessage(), t);
            }
            res.setStatus(Response.CLIENT_ERROR);
            res.setErrorMessage(StringUtils.toString(t));
        }
        return res;
    } else {
        // decode request.
        // 创建 Request 对象
        Request req = new Request(id);
        req.setVersion(Version.getProtocolVersion());
        // 通过逻辑与运算得到通信方式,并设置到 Request 对象中
        req.setTwoWay((flag & FLAG_TWOWAY) != 0);
        // 通过位运算检测数据包是否为事件类型
        if ((flag & FLAG_EVENT) != 0) {
            req.setEvent(true);
        }
        try {
            Object data;
            if (req.isEvent()) {
                ObjectInput in = CodecSupport.deserialize(channel.getUrl(), is, proto);
                // 对事件包进行解码
                data = decodeEventData(channel, in);
            } else {
                DecodeableRpcInvocation inv;
                // 根据 url 参数判断是否在 IO 线程上对消息体进行解码
                if (channel.getUrl().getParameter(DECODE_IN_IO_THREAD_KEY, DEFAULT_DECODE_IN_IO_THREAD)) {
                    inv = new DecodeableRpcInvocation(channel, req, is, proto);
                    // 在当前线程,也就是 IO 线程上进行后续的解码工作。此工作完成后,可将
                    // 调用方法名、attachment、以及调用参数解析出来
                    inv.decode();
                } else {
                    // 仅创建 DecodeableRpcInvocation 对象,但不在当前线程上执行解码逻辑
                    inv = new DecodeableRpcInvocation(channel, req,
                            new UnsafeByteArrayInputStream(readMessageData(is)), proto);
                }
                data = inv;
            }
            // 设置 data 到 Request 对象中
            req.setData(data);
        } catch (Throwable t) {
            if (log.isWarnEnabled()) {
                log.warn("Decode request failed: " + t.getMessage(), t);
            }
            // bad request
            // 若解码过程中出现异常,则将 broken 字段设为 true,
            // 并将异常对象设置到 Reqeust 对象中
            req.setBroken(true);
            req.setData(t);
        }

        return req;
    }
}

得到一个Request对象。我们看一下DecodeableRpcInvocation的decode方法:

@Override
public void decode() throws Exception {
    if (!hasDecoded && channel != null && inputStream != null) {
        try {
            decode(channel, inputStream);
        } catch (Throwable e) {
            if (log.isWarnEnabled()) {
                log.warn("Decode rpc invocation failed: " + e.getMessage(), e);
            }
            request.setBroken(true);
            request.setData(e);
        } finally {
            hasDecoded = true;
        }
    }
}

@Override
public Object decode(Channel channel, InputStream input) throws IOException {
    ObjectInput in = CodecSupport.getSerialization(channel.getUrl(), serializationType)
        .deserialize(channel.getUrl(), input);

    // 通过反序列化得到 dubbo version,并保存到 attachments 变量中
    String dubboVersion = in.readUTF();
    request.setVersion(dubboVersion);
    setAttachment(DUBBO_VERSION_KEY, dubboVersion);

    // 通过反序列化得到 path,version,并保存到 attachments 变量中
    String path = in.readUTF();
    setAttachment(PATH_KEY, path);
    setAttachment(VERSION_KEY, in.readUTF());

    // 通过反序列化得到调用方法名
    setMethodName(in.readUTF());

    // 通过反序列化得到参数类型字符串,比如 Ljava/lang/String;
    String desc = in.readUTF();
    setParameterTypesDesc(desc);

    try {
        Object[] args = DubboCodec.EMPTY_OBJECT_ARRAY;
        Class<?>[] pts = DubboCodec.EMPTY_CLASS_ARRAY;
        if (desc.length() > 0) {
            //                if (RpcUtils.isGenericCall(path, getMethodName()) || RpcUtils.isEcho(path, getMethodName())) {
            //                    pts = ReflectUtils.desc2classArray(desc);
            //                } else {
            ServiceRepository repository = ApplicationModel.getServiceRepository();
            ServiceDescriptor serviceDescriptor = repository.lookupService(path);
            if (serviceDescriptor != null) {
                MethodDescriptor methodDescriptor = serviceDescriptor.getMethod(getMethodName(), desc);
                if (methodDescriptor != null) {
                    pts = methodDescriptor.getParameterClasses();
                    this.setReturnTypes(methodDescriptor.getReturnTypes());
                }
            }
            if (pts == DubboCodec.EMPTY_CLASS_ARRAY) {
                if (!RpcUtils.isGenericCall(desc, getMethodName()) && !RpcUtils.isEcho(desc, getMethodName())) {
                    throw new IllegalArgumentException("Service not found:" + path + ", " + getMethodName());
                }
                // 将 desc 解析为参数类型数组
                pts = ReflectUtils.desc2classArray(desc);
            }
            //                }

            args = new Object[pts.length];
            for (int i = 0; i < args.length; i++) {
                try {
                    // 解析运行时参数
                    args[i] = in.readObject(pts[i]);
                } catch (Exception e) {
                    if (log.isWarnEnabled()) {
                        log.warn("Decode argument failed: " + e.getMessage(), e);
                    }
                }
            }
        }
        // 设置参数类型数组
        setParameterTypes(pts);

        // 通过反序列化得到原 attachment 的内容
        Map<String, Object> map = in.readAttachments();
        if (map != null && map.size() > 0) {
            Map<String, Object> attachment = getObjectAttachments();
            if (attachment == null) {
                attachment = new HashMap<>();
            }
            // 将 map 与当前对象中的 attachment 集合进行融合
            attachment.putAll(map);
            setObjectAttachments(attachment);
        }

        //decode argument ,may be callback
        // 对 callback 类型的参数进行处理
        for (int i = 0; i < args.length; i++) {
            args[i] = decodeInvocationArgument(channel, this, pts, i, args[i]);
        }
        // 设置参数列表
        setArguments(args);
        String targetServiceName = buildKey((String) getAttachment(PATH_KEY),
                                            getAttachment(GROUP_KEY),
                                            getAttachment(VERSION_KEY));
        setTargetServiceUniqueName(targetServiceName);
    } catch (ClassNotFoundException e) {
        throw new IOException(StringUtils.toString("Read invocation data failed.", e));
    } finally {
        if (in instanceof Cleanable) {
            ((Cleanable) in).cleanup();
        }
    }
    return this;
}

反序列化操作,这里的in默认的是hessian的实现Hessian2ObjectInput:

public class Hessian2ObjectInput implements ObjectInput {
    private final Hessian2Input input;

    public Hessian2ObjectInput(InputStream is) {
        input = new Hessian2Input(is);
        input.setSerializerFactory(Hessian2SerializerFactory.INSTANCE);
    }

    @Override
    public boolean readBool() throws IOException {
        return input.readBoolean();
    }

    @Override
    public byte readByte() throws IOException {
        return (byte) input.readInt();
    }

    @Override
    public short readShort() throws IOException {
        return (short) input.readInt();
    }

    @Override
    public int readInt() throws IOException {
        return input.readInt();
    }

    @Override
    public long readLong() throws IOException {
        return input.readLong();
    }

    @Override
    public float readFloat() throws IOException {
        return (float) input.readDouble();
    }

    @Override
    public double readDouble() throws IOException {
        return input.readDouble();
    }

    @Override
    public byte[] readBytes() throws IOException {
        return input.readBytes();
    }

    @Override
    public String readUTF() throws IOException {
        return input.readString();
    }

    @Override
    public Object readObject() throws IOException {
        return input.readObject();
    }

    @Override
    @SuppressWarnings("unchecked")
    public <T> T readObject(Class<T> cls) throws IOException {
        return (T) input.readObject(cls);
    }

    @Override
    public <T> T readObject(Class<T> cls, Type type) throws IOException {
        return readObject(cls);
    }

}

read过程就是Hessian2Input的对应方法。

至此编解码过程基本分析完了。关于响应消息的编解码和请求消息的差不多。有兴趣的可以自己看看。

Dubbo相关的内容至此也基本上结束了。实际上Dubbo官网上讲的非常详细,对照着源码看都很容易理解。当然还有围绕dubbo周边的一些东西,比如dubbo结合配置中心的动态配置如何实现的?spring-cloud-starter-dubbo在cloud环境下如何引入的? 这些问题在分析相关代码也不是很难。以后有机会再补充吧。

本文地址:https://blog.csdn.net/qq_19414183/article/details/111921711