欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Dubbo分析之Serialize层

程序员文章站 2022-04-19 10:26:50
本文将从最底层的serialize层开始来对dubbo进行源码分析 ......

dubbo整体设计

关于dubbo的整体设计可以查看官方文档,下图可以清晰的表达dubbo的整体设计:

Dubbo分析之Serialize层

1.图例说明

图中左边淡蓝背景的为服务消费方使用的接口,右边淡绿色背景的为服务提供方使用的接口,位于中轴线上的为双方都用到的接口;
图中从下至上分为十层,各层均为单向依赖,右边的黑色箭头代表层之间的依赖关系;
图中绿色小块的为扩展接口,蓝色小块为实现类,图中只显示用于关联各层的实现类;
图中蓝色虚线为初始化过程,即启动时组装链,红色实线为方法调用过程,即运行时调时链,紫色三角箭头为继承,可以把子类看作父类的同一个节点,线上的文字为调用的方法;

2.各层说明

config 配置层:对外配置接口,以 serviceconfig, referenceconfig 为中心,可以直接初始化配置类,也可以通过 spring 解析配置生成配置类;
proxy 服务代理层:服务接口透明代理,生成服务的客户端 stub 和服务器端 skeleton, 以 serviceproxy 为中心,扩展接口为 proxyfactory;
registry 注册中心层:封装服务地址的注册与发现,以服务 url 为中心,扩展接口为 registryfactory, registry, registryservice;
cluster 路由层:封装多个提供者的路由及负载均衡,并桥接注册中心,以 invoker 为中心,扩展接口为 cluster, directory, router, loadbalance;
monitor 监控层:rpc 调用次数和调用时间监控,以 statistics 为中心,扩展接口为 monitorfactory, monitor, monitorservice;
protocol 远程调用层:封装 rpc 调用,以 invocation, result 为中心,扩展接口为 protocol, invoker, exporter;
exchange 信息交换层:封装请求响应模式,同步转异步,以 request, response 为中心,扩展接口为 exchanger, exchangechannel, exchangeclient, exchangeserver;
transport 网络传输层:抽象 mina 和 netty 为统一接口,以 message 为中心,扩展接口为 channel, transporter, client, server, codec;
serialize 数据序列化层:可复用的一些工具,扩展接口为 serialization, objectinput, objectoutput, threadpool;

本文将从最底层的serialize层开始来对dubbo进行源码分析;

通讯框架

dubbo的底层通讯使用的是第三方框架,包括:netty,netty4,mina和grizzly;默认使用的是netty,分别提供了server端(服务提供方)和client端(服务消费方);下面已使用的netty为例来看那一下nettyserver的部分代码:

protected void doopen() throws throwable {
        nettyhelper.setnettyloggerfactory();
        executorservice boss = executors.newcachedthreadpool(new namedthreadfactory("nettyserverboss", true));
        executorservice worker = executors.newcachedthreadpool(new namedthreadfactory("nettyserverworker", true));
        channelfactory channelfactory = new nioserversocketchannelfactory(boss, worker, geturl().getpositiveparameter(constants.io_threads_key, constants.default_io_threads));
        bootstrap = new serverbootstrap(channelfactory);
 
        final nettyhandler nettyhandler = new nettyhandler(geturl(), this);
        channels = nettyhandler.getchannels();
        // https://issues.jboss.org/browse/netty-365
        // https://issues.jboss.org/browse/netty-379
        // final timer timer = new hashedwheeltimer(new namedthreadfactory("nettyidletimer", true));
        bootstrap.setoption("child.tcpnodelay", true);
        bootstrap.setpipelinefactory(new channelpipelinefactory() {
            @override
            public channelpipeline getpipeline() {
                nettycodecadapter adapter = new nettycodecadapter(getcodec(), geturl(), nettyserver.this);
                channelpipeline pipeline = channels.pipeline();
                /*int idletimeout = getidletimeout();
                if (idletimeout > 10000) {
                    pipeline.addlast("timer", new idlestatehandler(timer, idletimeout / 1000, 0, 0));
                }*/
                pipeline.addlast("decoder", adapter.getdecoder());
                pipeline.addlast("encoder", adapter.getencoder());
                pipeline.addlast("handler", nettyhandler);
                return pipeline;
            }
        });
        // bind
        channel = bootstrap.bind(getbindaddress());
    }

在启动服务提供方时就会调用此doopen方法,用来启动服务端口,供消费方连接;以上代码就是常规的启动nettyserver端代码,因为本文重点介绍dubbo的序列化,所以这里主要看decoder和encoder,这两个类分别定义在nettycodecadapter中:

private final channelhandler encoder = new internalencoder();
private final channelhandler decoder = new internaldecoder();

1.编码器

在nettycodecadapter定义了内部类internalencoder:

private class internalencoder extends onetooneencoder {
 
        @override
        protected object encode(channelhandlercontext ctx, channel ch, object msg) throws exception {
            com.alibaba.dubbo.remoting.buffer.channelbuffer buffer =
                    com.alibaba.dubbo.remoting.buffer.channelbuffers.dynamicbuffer(1024);
            nettychannel channel = nettychannel.getoraddchannel(ch, url, handler);
            try {
                codec.encode(channel, buffer, msg);
            } finally {
                nettychannel.removechannelifdisconnected(ch);
            }
            return channelbuffers.wrappedbuffer(buffer.tobytebuffer());
        }
    }

此类其实是对codec的包装,本身并没有做编码处理,下面重点看一下codec类,此类是一个接口类,有多种实现类,codec2源码如下:

@spi
public interface codec2 {
 
    @adaptive({constants.codec_key})
    void encode(channel channel, channelbuffer buffer, object message) throws ioexception;
 
    @adaptive({constants.codec_key})
    object decode(channel channel, channelbuffer buffer) throws ioexception;
 
 
    enum decoderesult {
        need_more_input, skip_some_input
    }
 
}

实现包括:transportcodec,telnetcodec,exchangecodec,dubbocountcodec以及thriftcodec,当然也可以自行扩展;不可能启动时把每种类型都加载,dubbo是通过在配置文件中配置好所有的类型,然后在运行中需要什么类加载什么类,
配置文件的具体路径:meta-inf/dubbo/internal/com.alibaba.dubbo.remoting.codec2,内容如下:

transport=com.alibaba.dubbo.remoting.transport.codec.transportcodec
telnet=com.alibaba.dubbo.remoting.telnet.codec.telnetcodec
exchange=com.alibaba.dubbo.remoting.exchange.codec.exchangecodec
dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.dubbocountcodec
thrift=com.alibaba.dubbo.rpc.protocol.thrift.thriftcodec

获取具体codec2的代码如下:

protected static codec2 getchannelcodec(url url) {
    string codecname = url.getparameter(constants.codec_key, "telnet");
    if (extensionloader.getextensionloader(codec2.class).hasextension(codecname)) {
        return extensionloader.getextensionloader(codec2.class).getextension(codecname);
    } else {
        return new codecadapter(extensionloader.getextensionloader(codec.class)
                .getextension(codecname));
    }
}

通过在url中获取是否有关键字codec,如果有的话就获取当前的值,dubbo默认的codec为dubbo;如果没有值默认为telnet;这里有默认值为dubbo,所以实现类dubbocountcodec会被extensionloader进行加载并进行缓存,下面具体看一下dubbocountcodec的编解码;

private dubbocodec codec = new dubbocodec();
 
@override
public void encode(channel channel, channelbuffer buffer, object msg) throws ioexception {
    codec.encode(channel, buffer, msg);
}

dubbocountcodec内部调用的是dubbocodec的encode方法,看一下如何对request对象进行编码的,具体代码块如下:

protected void encoderequest(channel channel, channelbuffer buffer, request req) throws ioexception {
       serialization serialization = getserialization(channel);
       // header.
       byte[] header = new byte[header_length];
       // set magic number.
       bytes.short2bytes(magic, header);
 
       // set request and serialization flag.
       header[2] = (byte) (flag_request | serialization.getcontenttypeid());
 
       if (req.istwoway()) header[2] |= flag_twoway;
       if (req.isevent()) header[2] |= flag_event;
 
       // set request id.
       bytes.long2bytes(req.getid(), header, 4);
 
       // encode request data.
       int savedwriteindex = buffer.writerindex();
       buffer.writerindex(savedwriteindex + header_length);
       channelbufferoutputstream bos = new channelbufferoutputstream(buffer);
       objectoutput out = serialization.serialize(channel.geturl(), bos);
       if (req.isevent()) {
           encodeeventdata(channel, out, req.getdata());
       } else {
           encoderequestdata(channel, out, req.getdata(), req.getversion());
       }
       out.flushbuffer();
       if (out instanceof cleanable) {
           ((cleanable) out).cleanup();
       }
       bos.flush();
       bos.close();
       int len = bos.writtenbytes();
       checkpayload(channel, len);
       bytes.int2bytes(len, header, 12);
 
       // write
       buffer.writerindex(savedwriteindex);
       buffer.writebytes(header); // write header.
       buffer.writerindex(savedwriteindex + header_length + len);
   }

前两个字节存放了魔数:0xdabb;第三个字节包含了四个信息分别是:是否是请求消息(还是响应消息),序列化类型,是否双向通信,是否是心跳消息;
在请求消息中直接跳过了第四个字节,直接在5-12位置存放了requestid,是一个long类型,第四个字节在如果是编码响应消息中会存放响应的状态;
代码往下看,buffer跳过了header_length长度的字节,这里表示的是header部分的长度为16个字节,然后通过指定的序列化方式把data对象序列化到buffer中,序列化之后可以获取到data对象总共的字节数,用一个int类型来保存字节数,此int类型存放在header的最后四个字节中;
最后把buffer的writerindex设置到写完header和data的地方,防止数据被覆盖;

2.解码器

在nettycodecadapter定义了内部类internalencoder,同样是调用dubbocodec的decode方法,部分代码如下:

public object decode(channel channel, channelbuffer buffer) throws ioexception {
        int readable = buffer.readablebytes();
        byte[] header = new byte[math.min(readable, header_length)];
        buffer.readbytes(header);
        return decode(channel, buffer, readable, header);
    }
 
    @override
    protected object decode(channel channel, channelbuffer buffer, int readable, byte[] header) throws ioexception {
        // check magic number.
        if (readable > 0 && header[0] != magic_high
                || readable > 1 && header[1] != magic_low) {
            int length = header.length;
            if (header.length < readable) {
                header = bytes.copyof(header, readable);
                buffer.readbytes(header, length, readable - length);
            }
            for (int i = 1; i < header.length - 1; i++) {
                if (header[i] == magic_high && header[i + 1] == magic_low) {
                    buffer.readerindex(buffer.readerindex() - header.length + i);
                    header = bytes.copyof(header, i);
                    break;
                }
            }
            return super.decode(channel, buffer, readable, header);
        }
        // check length.
        if (readable < header_length) {
            return decoderesult.need_more_input;
        }
 
        // get data length.
        int len = bytes.bytes2int(header, 12);
        checkpayload(channel, len);
 
        int tt = len + header_length;
        if (readable < tt) {
            return decoderesult.need_more_input;
        }
 
        // limit input stream.
        channelbufferinputstream is = new channelbufferinputstream(buffer, len);
 
        try {
            return decodebody(channel, is, header);
        } finally {
            if (is.available() > 0) {
                try {
                    if (logger.iswarnenabled()) {
                        logger.warn("skip input stream " + is.available());
                    }
                    streamutils.skipunusedstream(is);
                } catch (ioexception e) {
                    logger.warn(e.getmessage(), e);
                }
            }
        }
    }

首先读取math.min(readable, header_length),如果readable小于header_length,表示接收方连头部的16个字节还没接受完,需要等待接收;正常header接收完之后需要进行检查,主要包括:魔数的检查,header消息长度检查,消息体长度检查(检查消息体是否已经接收完成);检查完之后需要对消息体进行反序列化,具体在decodebody方法中:

@override
    protected object decodebody(channel channel, inputstream is, byte[] header) throws ioexception {
        byte flag = header[2], proto = (byte) (flag & serialization_mask);
        serialization s = codecsupport.getserialization(channel.geturl(), proto);
        // get request id.
        long id = bytes.bytes2long(header, 4);
        if ((flag & flag_request) == 0) {
            // decode response.
            response res = new response(id);
            if ((flag & flag_event) != 0) {
                res.setevent(response.heartbeat_event);
            }
            // get status.
            byte status = header[3];
            res.setstatus(status);
            if (status == response.ok) {
                try {
                    object data;
                    if (res.isheartbeat()) {
                        data = decodeheartbeatdata(channel, deserialize(s, channel.geturl(), is));
                    } else if (res.isevent()) {
                        data = decodeeventdata(channel, deserialize(s, channel.geturl(), is));
                    } else {
                        decodeablerpcresult result;
                        if (channel.geturl().getparameter(
                                constants.decode_in_io_thread_key,
                                constants.default_decode_in_io_thread)) {
                            result = new decodeablerpcresult(channel, res, is,
                                    (invocation) getrequestdata(id), proto);
                            result.decode();
                        } else {
                            result = new decodeablerpcresult(channel, res,
                                    new unsafebytearrayinputstream(readmessagedata(is)),
                                    (invocation) getrequestdata(id), proto);
                        }
                        data = result;
                    }
                    res.setresult(data);
                } catch (throwable t) {
                    if (log.iswarnenabled()) {
                        log.warn("decode response failed: " + t.getmessage(), t);
                    }
                    res.setstatus(response.client_error);
                    res.seterrormessage(stringutils.tostring(t));
                }
            } else {
                res.seterrormessage(deserialize(s, channel.geturl(), is).readutf());
            }
            return res;
        } else {
            // decode request.
            request req = new request(id);
            req.setversion(version.getprotocolversion());
            req.settwoway((flag & flag_twoway) != 0);
            if ((flag & flag_event) != 0) {
                req.setevent(request.heartbeat_event);
            }
            try {
                object data;
                if (req.isheartbeat()) {
                    data = decodeheartbeatdata(channel, deserialize(s, channel.geturl(), is));
                } else if (req.isevent()) {
                    data = decodeeventdata(channel, deserialize(s, channel.geturl(), is));
                } else {
                    decodeablerpcinvocation inv;
                    if (channel.geturl().getparameter(
                            constants.decode_in_io_thread_key,
                            constants.default_decode_in_io_thread)) {
                        inv = new decodeablerpcinvocation(channel, req, is, proto);
                        inv.decode();
                    } else {
                        inv = new decodeablerpcinvocation(channel, req,
                                new unsafebytearrayinputstream(readmessagedata(is)), proto);
                    }
                    data = inv;
                }
                req.setdata(data);
            } catch (throwable t) {
                if (log.iswarnenabled()) {
                    log.warn("decode request failed: " + t.getmessage(), t);
                }
                // bad request
                req.setbroken(true);
                req.setdata(t);
            }
            return req;
        }
    }

首先通过解析header部分的第三个字节,识别出是请求消息还是响应消息,还有使用哪种类型的序列化方式,然后分别进行序列化;

序列化和反序列化

通过以上对编码器解码器的了解,在编码器中需要序列化request/response,在解码器中需要序列化request/response,下面具体看看序列化和反序列化;

1.序列化

在编码器中需要获取具体的serialization,具体代码如下:

public static serialization getserialization(url url) {
    return extensionloader.getextensionloader(serialization.class).getextension(
            url.getparameter(constants.serialization_key, constants.default_remoting_serialization));
}

同获取codec的方式,dubbo也提供了多种序列化方式,同时可以自定义扩展;通过在url中获取serialization关键字,如果获取不到默认为hession2;同样多种序列化类也配置在一个文件中,
路径:meta-inf/dubbo/internal/com.alibaba.dubbo.common.serialize.serialization,具体内容如下:

fastjson=com.alibaba.dubbo.common.serialize.fastjson.fastjsonserialization
fst=com.alibaba.dubbo.common.serialize.fst.fstserialization
hessian2=com.alibaba.dubbo.common.serialize.hessian2.hessian2serialization
java=com.alibaba.dubbo.common.serialize.java.javaserialization
compactedjava=com.alibaba.dubbo.common.serialize.java.compactedjavaserialization
nativejava=com.alibaba.dubbo.common.serialize.nativejava.nativejavaserialization
kryo=com.alibaba.dubbo.common.serialize.kryo.kryoserialization

dubbo默认提供了fastjson,fst,hessian2,java,compactedjava,nativejava和kryo多种序列化方式;
每种序列化方式都需要实现如下三个接口类:serialization,objectinput以及objectoutput;
serialization接口类:

public interface serialization {
 
    byte getcontenttypeid();
 
    string getcontenttype();
 
    @adaptive
    objectoutput serialize(url url, outputstream output) throws ioexception;
 
    @adaptive
    objectinput deserialize(url url, inputstream input) throws ioexception;
 
}

其中的contenttypeid就是在header中存放的序列化类型,反序列化的时候需要通过此id获取具体的serialization,所以此contenttypeid不能出现重复的,否则会被覆盖;
objectinput接口类:

public interface objectoutput extends dataoutput {
 
    void writeobject(object obj) throws ioexception;
}

objectoutput接口类:

public interface objectinput extends datainput {
 
    object readobject() throws ioexception, classnotfoundexception;
 
    <t> t readobject(class<t> cls) throws ioexception, classnotfoundexception;
 
    <t> t readobject(class<t> cls, type type) throws ioexception, classnotfoundexception;
}

分别提供了读取对象和写对象的接口方法,dataoutput和datainput分别提供了对基本数据类型的读和写;序列化只需要调用writeobject方法将data写入数据流即可;具体可以看一下编码器中调用的encoderequestdata方法:

@override
protected void encoderequestdata(channel channel, objectoutput out, object data, string version) throws ioexception {
    rpcinvocation inv = (rpcinvocation) data;
 
    out.writeutf(version);
    out.writeutf(inv.getattachment(constants.path_key));
    out.writeutf(inv.getattachment(constants.version_key));
 
    out.writeutf(inv.getmethodname());
    out.writeutf(reflectutils.getdesc(inv.getparametertypes()));
    object[] args = inv.getarguments();
    if (args != null)
        for (int i = 0; i < args.length; i++) {
            out.writeobject(encodeinvocationargument(channel, inv, i));
        }
    out.writeobject(inv.getattachments());
}

默认使用的dubbocountcodec方式并没有直接将data写入流中,而是将rpcinvocation中的数据取出分别写入流;

2.反序列化

反序列化通过读取header中的序列化类型,然后通过如下方法获取具体的serialization,具体在类codecsupport中:

public static serialization getserialization(url url, byte id) throws ioexception {
    serialization serialization = getserializationbyid(id);
    string serializationname = url.getparameter(constants.serialization_key, constants.default_remoting_serialization);
    // check if "serialization id" passed from network matches the id on this side(only take effect for jdk serialization), for security purpose.
    if (serialization == null
            || ((id == 3 || id == 7 || id == 4) && !(serializationname.equals(id_serializationname_map.get(id))))) {
        throw new ioexception("unexpected serialization id:" + id + " received from network, please check if the peer send the right id.");
    }
    return serialization;
}
 
private static map<byte, serialization> id_serialization_map = new hashmap<byte, serialization>();
 
public static serialization getserializationbyid(byte id) {
    return id_serialization_map.get(id);
}

id_serialization_map存放着contenttypeid和具体serialization的对应关系,然后通过id获取具体的serialization,然后根据写入的顺序读取数据;

扩展序列化类型

dubbo本身对很多模块提供了很好的扩展功能,包括序列化功能,以下来分析一下如何使用protobuf来实现序列化方式;

1.整体代码结构

首先看一下整体的代码结构,如下图所示:

Dubbo分析之Serialize层

分别实现三个接口类:serialization,objectinput以及objectoutput;然后在指定目录下提供一个文本文件;

2.引入扩展包

<dependency>
     <groupid>com.dyuproject.protostuff</groupid>
     <artifactid>protostuff-core</artifactid>
     <version>1.1.3</version>
</dependency>
<dependency>
     <groupid>com.dyuproject.protostuff</groupid>
     <artifactid>protostuff-runtime</artifactid>
     <version>1.1.3</version>
</dependency>

3.实现接口objectinput和objectoutput

public class protobufobjectinput implements objectinput {
 
    private objectinputstream input;
 
    public protobufobjectinput(inputstream inputstream) throws ioexception {
        this.input = new objectinputstream(inputstream);
    }
 
    ....省略基础类型...
     
    @override
    public object readobject() throws ioexception, classnotfoundexception {
        return input.readobject();
    }
 
    @override
    public <t> t readobject(class<t> clazz) throws ioexception {
        try {
            byte[] buffer = (byte[]) input.readobject();
            input.read(buffer);
            return serializationutil.deserialize(buffer, clazz);
        } catch (exception e) {
            throw new ioexception(e);
        }
 
    }
 
    @override
    public <t> t readobject(class<t> clazz, type type) throws ioexception {
        try {
            byte[] buffer = (byte[]) input.readobject();
            input.read(buffer);
            return serializationutil.deserialize(buffer, clazz);
        } catch (exception e) {
            throw new ioexception(e);
        }
    }
}
 
public class protobufobjectoutput implements objectoutput {
 
    private objectoutputstream outputstream;
 
    public protobufobjectoutput(outputstream outputstream) throws ioexception {
        this.outputstream = new objectoutputstream(outputstream);
    }
 
    ....省略基础类型...
 
    @override
    public void writeobject(object v) throws ioexception {
        byte[] bytes = serializationutil.serialize(v);
        outputstream.writeobject(bytes);
        outputstream.flush();
    }
 
    @override
    public void flushbuffer() throws ioexception {
        outputstream.flush();
    }
}

4.实现serialization接口

public class protobufserialization implements serialization {
 
    @override
    public byte getcontenttypeid() {
        return 10;
    }
 
    @override
    public string getcontenttype() {
        return "x-application/protobuf";
    }
 
    @override
    public objectoutput serialize(url url, outputstream out) throws ioexception {
        return new protobufobjectoutput(out);
    }
 
    @override
    public objectinput deserialize(url url, inputstream is) throws ioexception {
        return new protobufobjectinput(is);
    }
}

这里引入了一个新的contenttypeid,需要保证和dubbo里面已存在的不要冲突

5.指定目录提供注册

在meta-inf/dubbo/internal/目录下提供文件com.alibaba.dubbo.common.serialize.serialization,内容如下:

protobuf=com.dubbocommon.protobufserialization

6.在提供方配置新的序列化方式

<dubbo:protocol name="dubbo" port="20880" serialization="protobuf"/>

这样就会使用新扩展的protobuf序列化方式来序列化对象;

总结

本文从dubbo整体设计的最底层serialization层来分析和了解dubbo,后面会逐层进行分析,对dubbo有一个更加透彻的了解;

示例代码地址

https://github.com/ksfzhaohui/blog

https://gitee.com/outofmemory/blog

推荐阅读:https://www.roncoo.com/course/list.html?coursename=dubbo