Dubbo分析之Serialize层
dubbo整体设计
关于dubbo的整体设计可以查看官方文档,下图可以清晰的表达dubbo的整体设计:
1.图例说明
图中左边淡蓝背景的为服务消费方使用的接口,右边淡绿色背景的为服务提供方使用的接口,位于中轴线上的为双方都用到的接口;
图中从下至上分为十层,各层均为单向依赖,右边的黑色箭头代表层之间的依赖关系;
图中绿色小块的为扩展接口,蓝色小块为实现类,图中只显示用于关联各层的实现类;
图中蓝色虚线为初始化过程,即启动时组装链,红色实线为方法调用过程,即运行时调时链,紫色三角箭头为继承,可以把子类看作父类的同一个节点,线上的文字为调用的方法;
2.各层说明
config 配置层:对外配置接口,以 serviceconfig, referenceconfig 为中心,可以直接初始化配置类,也可以通过 spring 解析配置生成配置类;
proxy 服务代理层:服务接口透明代理,生成服务的客户端 stub 和服务器端 skeleton, 以 serviceproxy 为中心,扩展接口为 proxyfactory;
registry 注册中心层:封装服务地址的注册与发现,以服务 url 为中心,扩展接口为 registryfactory, registry, registryservice;
cluster 路由层:封装多个提供者的路由及负载均衡,并桥接注册中心,以 invoker 为中心,扩展接口为 cluster, directory, router, loadbalance;
monitor 监控层:rpc 调用次数和调用时间监控,以 statistics 为中心,扩展接口为 monitorfactory, monitor, monitorservice;
protocol 远程调用层:封装 rpc 调用,以 invocation, result 为中心,扩展接口为 protocol, invoker, exporter;
exchange 信息交换层:封装请求响应模式,同步转异步,以 request, response 为中心,扩展接口为 exchanger, exchangechannel, exchangeclient, exchangeserver;
transport 网络传输层:抽象 mina 和 netty 为统一接口,以 message 为中心,扩展接口为 channel, transporter, client, server, codec;
serialize 数据序列化层:可复用的一些工具,扩展接口为 serialization, objectinput, objectoutput, threadpool;
本文将从最底层的serialize层开始来对dubbo进行源码分析;
通讯框架
dubbo的底层通讯使用的是第三方框架,包括:netty,netty4,mina和grizzly;默认使用的是netty,分别提供了server端(服务提供方)和client端(服务消费方);下面已使用的netty为例来看那一下nettyserver的部分代码:
protected void doopen() throws throwable { nettyhelper.setnettyloggerfactory(); executorservice boss = executors.newcachedthreadpool(new namedthreadfactory("nettyserverboss", true)); executorservice worker = executors.newcachedthreadpool(new namedthreadfactory("nettyserverworker", true)); channelfactory channelfactory = new nioserversocketchannelfactory(boss, worker, geturl().getpositiveparameter(constants.io_threads_key, constants.default_io_threads)); bootstrap = new serverbootstrap(channelfactory); final nettyhandler nettyhandler = new nettyhandler(geturl(), this); channels = nettyhandler.getchannels(); // https://issues.jboss.org/browse/netty-365 // https://issues.jboss.org/browse/netty-379 // final timer timer = new hashedwheeltimer(new namedthreadfactory("nettyidletimer", true)); bootstrap.setoption("child.tcpnodelay", true); bootstrap.setpipelinefactory(new channelpipelinefactory() { @override public channelpipeline getpipeline() { nettycodecadapter adapter = new nettycodecadapter(getcodec(), geturl(), nettyserver.this); channelpipeline pipeline = channels.pipeline(); /*int idletimeout = getidletimeout(); if (idletimeout > 10000) { pipeline.addlast("timer", new idlestatehandler(timer, idletimeout / 1000, 0, 0)); }*/ pipeline.addlast("decoder", adapter.getdecoder()); pipeline.addlast("encoder", adapter.getencoder()); pipeline.addlast("handler", nettyhandler); return pipeline; } }); // bind channel = bootstrap.bind(getbindaddress()); }
在启动服务提供方时就会调用此doopen方法,用来启动服务端口,供消费方连接;以上代码就是常规的启动nettyserver端代码,因为本文重点介绍dubbo的序列化,所以这里主要看decoder和encoder,这两个类分别定义在nettycodecadapter中:
private final channelhandler encoder = new internalencoder(); private final channelhandler decoder = new internaldecoder();
1.编码器
在nettycodecadapter定义了内部类internalencoder:
private class internalencoder extends onetooneencoder { @override protected object encode(channelhandlercontext ctx, channel ch, object msg) throws exception { com.alibaba.dubbo.remoting.buffer.channelbuffer buffer = com.alibaba.dubbo.remoting.buffer.channelbuffers.dynamicbuffer(1024); nettychannel channel = nettychannel.getoraddchannel(ch, url, handler); try { codec.encode(channel, buffer, msg); } finally { nettychannel.removechannelifdisconnected(ch); } return channelbuffers.wrappedbuffer(buffer.tobytebuffer()); } }
此类其实是对codec的包装,本身并没有做编码处理,下面重点看一下codec类,此类是一个接口类,有多种实现类,codec2源码如下:
@spi public interface codec2 { @adaptive({constants.codec_key}) void encode(channel channel, channelbuffer buffer, object message) throws ioexception; @adaptive({constants.codec_key}) object decode(channel channel, channelbuffer buffer) throws ioexception; enum decoderesult { need_more_input, skip_some_input } }
实现包括:transportcodec,telnetcodec,exchangecodec,dubbocountcodec以及thriftcodec,当然也可以自行扩展;不可能启动时把每种类型都加载,dubbo是通过在配置文件中配置好所有的类型,然后在运行中需要什么类加载什么类,
配置文件的具体路径:meta-inf/dubbo/internal/com.alibaba.dubbo.remoting.codec2,内容如下:
transport=com.alibaba.dubbo.remoting.transport.codec.transportcodec telnet=com.alibaba.dubbo.remoting.telnet.codec.telnetcodec exchange=com.alibaba.dubbo.remoting.exchange.codec.exchangecodec dubbo=com.alibaba.dubbo.rpc.protocol.dubbo.dubbocountcodec thrift=com.alibaba.dubbo.rpc.protocol.thrift.thriftcodec
获取具体codec2的代码如下:
protected static codec2 getchannelcodec(url url) { string codecname = url.getparameter(constants.codec_key, "telnet"); if (extensionloader.getextensionloader(codec2.class).hasextension(codecname)) { return extensionloader.getextensionloader(codec2.class).getextension(codecname); } else { return new codecadapter(extensionloader.getextensionloader(codec.class) .getextension(codecname)); } }
通过在url中获取是否有关键字codec,如果有的话就获取当前的值,dubbo默认的codec为dubbo;如果没有值默认为telnet;这里有默认值为dubbo,所以实现类dubbocountcodec会被extensionloader进行加载并进行缓存,下面具体看一下dubbocountcodec的编解码;
private dubbocodec codec = new dubbocodec(); @override public void encode(channel channel, channelbuffer buffer, object msg) throws ioexception { codec.encode(channel, buffer, msg); }
dubbocountcodec内部调用的是dubbocodec的encode方法,看一下如何对request对象进行编码的,具体代码块如下:
protected void encoderequest(channel channel, channelbuffer buffer, request req) throws ioexception { serialization serialization = getserialization(channel); // header. byte[] header = new byte[header_length]; // set magic number. bytes.short2bytes(magic, header); // set request and serialization flag. header[2] = (byte) (flag_request | serialization.getcontenttypeid()); if (req.istwoway()) header[2] |= flag_twoway; if (req.isevent()) header[2] |= flag_event; // set request id. bytes.long2bytes(req.getid(), header, 4); // encode request data. int savedwriteindex = buffer.writerindex(); buffer.writerindex(savedwriteindex + header_length); channelbufferoutputstream bos = new channelbufferoutputstream(buffer); objectoutput out = serialization.serialize(channel.geturl(), bos); if (req.isevent()) { encodeeventdata(channel, out, req.getdata()); } else { encoderequestdata(channel, out, req.getdata(), req.getversion()); } out.flushbuffer(); if (out instanceof cleanable) { ((cleanable) out).cleanup(); } bos.flush(); bos.close(); int len = bos.writtenbytes(); checkpayload(channel, len); bytes.int2bytes(len, header, 12); // write buffer.writerindex(savedwriteindex); buffer.writebytes(header); // write header. buffer.writerindex(savedwriteindex + header_length + len); }
前两个字节存放了魔数:0xdabb;第三个字节包含了四个信息分别是:是否是请求消息(还是响应消息),序列化类型,是否双向通信,是否是心跳消息;
在请求消息中直接跳过了第四个字节,直接在5-12位置存放了requestid,是一个long类型,第四个字节在如果是编码响应消息中会存放响应的状态;
代码往下看,buffer跳过了header_length长度的字节,这里表示的是header部分的长度为16个字节,然后通过指定的序列化方式把data对象序列化到buffer中,序列化之后可以获取到data对象总共的字节数,用一个int类型来保存字节数,此int类型存放在header的最后四个字节中;
最后把buffer的writerindex设置到写完header和data的地方,防止数据被覆盖;
2.解码器
在nettycodecadapter定义了内部类internalencoder,同样是调用dubbocodec的decode方法,部分代码如下:
public object decode(channel channel, channelbuffer buffer) throws ioexception { int readable = buffer.readablebytes(); byte[] header = new byte[math.min(readable, header_length)]; buffer.readbytes(header); return decode(channel, buffer, readable, header); } @override protected object decode(channel channel, channelbuffer buffer, int readable, byte[] header) throws ioexception { // check magic number. if (readable > 0 && header[0] != magic_high || readable > 1 && header[1] != magic_low) { int length = header.length; if (header.length < readable) { header = bytes.copyof(header, readable); buffer.readbytes(header, length, readable - length); } for (int i = 1; i < header.length - 1; i++) { if (header[i] == magic_high && header[i + 1] == magic_low) { buffer.readerindex(buffer.readerindex() - header.length + i); header = bytes.copyof(header, i); break; } } return super.decode(channel, buffer, readable, header); } // check length. if (readable < header_length) { return decoderesult.need_more_input; } // get data length. int len = bytes.bytes2int(header, 12); checkpayload(channel, len); int tt = len + header_length; if (readable < tt) { return decoderesult.need_more_input; } // limit input stream. channelbufferinputstream is = new channelbufferinputstream(buffer, len); try { return decodebody(channel, is, header); } finally { if (is.available() > 0) { try { if (logger.iswarnenabled()) { logger.warn("skip input stream " + is.available()); } streamutils.skipunusedstream(is); } catch (ioexception e) { logger.warn(e.getmessage(), e); } } } }
首先读取math.min(readable, header_length),如果readable小于header_length,表示接收方连头部的16个字节还没接受完,需要等待接收;正常header接收完之后需要进行检查,主要包括:魔数的检查,header消息长度检查,消息体长度检查(检查消息体是否已经接收完成);检查完之后需要对消息体进行反序列化,具体在decodebody方法中:
@override protected object decodebody(channel channel, inputstream is, byte[] header) throws ioexception { byte flag = header[2], proto = (byte) (flag & serialization_mask); serialization s = codecsupport.getserialization(channel.geturl(), proto); // get request id. long id = bytes.bytes2long(header, 4); if ((flag & flag_request) == 0) { // decode response. response res = new response(id); if ((flag & flag_event) != 0) { res.setevent(response.heartbeat_event); } // get status. byte status = header[3]; res.setstatus(status); if (status == response.ok) { try { object data; if (res.isheartbeat()) { data = decodeheartbeatdata(channel, deserialize(s, channel.geturl(), is)); } else if (res.isevent()) { data = decodeeventdata(channel, deserialize(s, channel.geturl(), is)); } else { decodeablerpcresult result; if (channel.geturl().getparameter( constants.decode_in_io_thread_key, constants.default_decode_in_io_thread)) { result = new decodeablerpcresult(channel, res, is, (invocation) getrequestdata(id), proto); result.decode(); } else { result = new decodeablerpcresult(channel, res, new unsafebytearrayinputstream(readmessagedata(is)), (invocation) getrequestdata(id), proto); } data = result; } res.setresult(data); } catch (throwable t) { if (log.iswarnenabled()) { log.warn("decode response failed: " + t.getmessage(), t); } res.setstatus(response.client_error); res.seterrormessage(stringutils.tostring(t)); } } else { res.seterrormessage(deserialize(s, channel.geturl(), is).readutf()); } return res; } else { // decode request. request req = new request(id); req.setversion(version.getprotocolversion()); req.settwoway((flag & flag_twoway) != 0); if ((flag & flag_event) != 0) { req.setevent(request.heartbeat_event); } try { object data; if (req.isheartbeat()) { data = decodeheartbeatdata(channel, deserialize(s, channel.geturl(), is)); } else if (req.isevent()) { data = decodeeventdata(channel, deserialize(s, channel.geturl(), is)); } else { decodeablerpcinvocation inv; if (channel.geturl().getparameter( constants.decode_in_io_thread_key, constants.default_decode_in_io_thread)) { inv = new decodeablerpcinvocation(channel, req, is, proto); inv.decode(); } else { inv = new decodeablerpcinvocation(channel, req, new unsafebytearrayinputstream(readmessagedata(is)), proto); } data = inv; } req.setdata(data); } catch (throwable t) { if (log.iswarnenabled()) { log.warn("decode request failed: " + t.getmessage(), t); } // bad request req.setbroken(true); req.setdata(t); } return req; } }
首先通过解析header部分的第三个字节,识别出是请求消息还是响应消息,还有使用哪种类型的序列化方式,然后分别进行序列化;
序列化和反序列化
通过以上对编码器解码器的了解,在编码器中需要序列化request/response,在解码器中需要序列化request/response,下面具体看看序列化和反序列化;
1.序列化
在编码器中需要获取具体的serialization,具体代码如下:
public static serialization getserialization(url url) { return extensionloader.getextensionloader(serialization.class).getextension( url.getparameter(constants.serialization_key, constants.default_remoting_serialization)); }
同获取codec的方式,dubbo也提供了多种序列化方式,同时可以自定义扩展;通过在url中获取serialization关键字,如果获取不到默认为hession2;同样多种序列化类也配置在一个文件中,
路径:meta-inf/dubbo/internal/com.alibaba.dubbo.common.serialize.serialization,具体内容如下:
fastjson=com.alibaba.dubbo.common.serialize.fastjson.fastjsonserialization fst=com.alibaba.dubbo.common.serialize.fst.fstserialization hessian2=com.alibaba.dubbo.common.serialize.hessian2.hessian2serialization java=com.alibaba.dubbo.common.serialize.java.javaserialization compactedjava=com.alibaba.dubbo.common.serialize.java.compactedjavaserialization nativejava=com.alibaba.dubbo.common.serialize.nativejava.nativejavaserialization kryo=com.alibaba.dubbo.common.serialize.kryo.kryoserialization
dubbo默认提供了fastjson,fst,hessian2,java,compactedjava,nativejava和kryo多种序列化方式;
每种序列化方式都需要实现如下三个接口类:serialization,objectinput以及objectoutput;
serialization接口类:
public interface serialization { byte getcontenttypeid(); string getcontenttype(); @adaptive objectoutput serialize(url url, outputstream output) throws ioexception; @adaptive objectinput deserialize(url url, inputstream input) throws ioexception; }
其中的contenttypeid就是在header中存放的序列化类型,反序列化的时候需要通过此id获取具体的serialization,所以此contenttypeid不能出现重复的,否则会被覆盖;
objectinput接口类:
public interface objectoutput extends dataoutput { void writeobject(object obj) throws ioexception; }
objectoutput接口类:
public interface objectinput extends datainput { object readobject() throws ioexception, classnotfoundexception; <t> t readobject(class<t> cls) throws ioexception, classnotfoundexception; <t> t readobject(class<t> cls, type type) throws ioexception, classnotfoundexception; }
分别提供了读取对象和写对象的接口方法,dataoutput和datainput分别提供了对基本数据类型的读和写;序列化只需要调用writeobject方法将data写入数据流即可;具体可以看一下编码器中调用的encoderequestdata方法:
@override protected void encoderequestdata(channel channel, objectoutput out, object data, string version) throws ioexception { rpcinvocation inv = (rpcinvocation) data; out.writeutf(version); out.writeutf(inv.getattachment(constants.path_key)); out.writeutf(inv.getattachment(constants.version_key)); out.writeutf(inv.getmethodname()); out.writeutf(reflectutils.getdesc(inv.getparametertypes())); object[] args = inv.getarguments(); if (args != null) for (int i = 0; i < args.length; i++) { out.writeobject(encodeinvocationargument(channel, inv, i)); } out.writeobject(inv.getattachments()); }
默认使用的dubbocountcodec方式并没有直接将data写入流中,而是将rpcinvocation中的数据取出分别写入流;
2.反序列化
反序列化通过读取header中的序列化类型,然后通过如下方法获取具体的serialization,具体在类codecsupport中:
public static serialization getserialization(url url, byte id) throws ioexception { serialization serialization = getserializationbyid(id); string serializationname = url.getparameter(constants.serialization_key, constants.default_remoting_serialization); // check if "serialization id" passed from network matches the id on this side(only take effect for jdk serialization), for security purpose. if (serialization == null || ((id == 3 || id == 7 || id == 4) && !(serializationname.equals(id_serializationname_map.get(id))))) { throw new ioexception("unexpected serialization id:" + id + " received from network, please check if the peer send the right id."); } return serialization; } private static map<byte, serialization> id_serialization_map = new hashmap<byte, serialization>(); public static serialization getserializationbyid(byte id) { return id_serialization_map.get(id); }
id_serialization_map存放着contenttypeid和具体serialization的对应关系,然后通过id获取具体的serialization,然后根据写入的顺序读取数据;
扩展序列化类型
dubbo本身对很多模块提供了很好的扩展功能,包括序列化功能,以下来分析一下如何使用protobuf来实现序列化方式;
1.整体代码结构
首先看一下整体的代码结构,如下图所示:
分别实现三个接口类:serialization,objectinput以及objectoutput;然后在指定目录下提供一个文本文件;
2.引入扩展包
<dependency> <groupid>com.dyuproject.protostuff</groupid> <artifactid>protostuff-core</artifactid> <version>1.1.3</version> </dependency> <dependency> <groupid>com.dyuproject.protostuff</groupid> <artifactid>protostuff-runtime</artifactid> <version>1.1.3</version> </dependency>
3.实现接口objectinput和objectoutput
public class protobufobjectinput implements objectinput { private objectinputstream input; public protobufobjectinput(inputstream inputstream) throws ioexception { this.input = new objectinputstream(inputstream); } ....省略基础类型... @override public object readobject() throws ioexception, classnotfoundexception { return input.readobject(); } @override public <t> t readobject(class<t> clazz) throws ioexception { try { byte[] buffer = (byte[]) input.readobject(); input.read(buffer); return serializationutil.deserialize(buffer, clazz); } catch (exception e) { throw new ioexception(e); } } @override public <t> t readobject(class<t> clazz, type type) throws ioexception { try { byte[] buffer = (byte[]) input.readobject(); input.read(buffer); return serializationutil.deserialize(buffer, clazz); } catch (exception e) { throw new ioexception(e); } } } public class protobufobjectoutput implements objectoutput { private objectoutputstream outputstream; public protobufobjectoutput(outputstream outputstream) throws ioexception { this.outputstream = new objectoutputstream(outputstream); } ....省略基础类型... @override public void writeobject(object v) throws ioexception { byte[] bytes = serializationutil.serialize(v); outputstream.writeobject(bytes); outputstream.flush(); } @override public void flushbuffer() throws ioexception { outputstream.flush(); } }
4.实现serialization接口
public class protobufserialization implements serialization { @override public byte getcontenttypeid() { return 10; } @override public string getcontenttype() { return "x-application/protobuf"; } @override public objectoutput serialize(url url, outputstream out) throws ioexception { return new protobufobjectoutput(out); } @override public objectinput deserialize(url url, inputstream is) throws ioexception { return new protobufobjectinput(is); } }
这里引入了一个新的contenttypeid,需要保证和dubbo里面已存在的不要冲突
5.指定目录提供注册
在meta-inf/dubbo/internal/目录下提供文件com.alibaba.dubbo.common.serialize.serialization,内容如下:
protobuf=com.dubbocommon.protobufserialization
6.在提供方配置新的序列化方式
<dubbo:protocol name="dubbo" port="20880" serialization="protobuf"/>
这样就会使用新扩展的protobuf序列化方式来序列化对象;
总结
本文从dubbo整体设计的最底层serialization层来分析和了解dubbo,后面会逐层进行分析,对dubbo有一个更加透彻的了解;
示例代码地址
https://github.com/ksfzhaohui/blog
https://gitee.com/outofmemory/blog
推荐阅读:https://www.roncoo.com/course/list.html?coursename=dubbo