Netty 在 Dubbo 中是如何应用的?
作者:莫那·鲁道
众所周知,国内知名框架 dubbo 底层使用的是 netty 作为网络通信,那么内部到底是如何使用的呢?今天我们就来一探究竟。
1. dubbo 的 consumer 消费者如何使用 netty
注意:此次代码使用了从 github 上 clone 的 dubbo 源码中的 dubbo-demo 例子。
代码如下:
system.setproperty("java.net.preferipv4stack", "true"); classpathxmlapplicationcontext context = new classpathxmlapplicationcontext(new string[]{"meta-inf/spring/dubbo-demo-consumer.xml"}); context.start(); // @1 demoservice demoservice = (demoservice) context.getbean("demoservice"); // get remote service proxy int a = 0; while (true) { try { thread.sleep(1000); system.err.println( ++ a + " "); string hello = demoservice.sayhello("world"); // call remote method system.out.println(hello); // get result } catch (throwable throwable) { throwable.printstacktrace(); } }
当代码执行到 @1 的时候,会调用 spring 容器的 getbean 方法,而 dubbo 扩展了 factorybean,所以,会调用 getobject 方法,该方法会创建代理对象。
这个过程中会调用 dubboprotocol 实例的 getclients(url url) 方法,当这个给定的 url 的 client 没有初始化则创建,然后放入缓存,代码如下:
这个 initclient 方法就是创建 netty 的 client 的。
最终调用的就是抽象父类 abstractclient 的构造方法,构造方法中包含了创建 socket 客户端,连接客户端等行为。
public abstractclient(url url, channelhandler handler) throws remotingexception { doopen(); connect(); }
doopent 方法用来创建 netty 的 bootstrap :
protected void doopen() throws throwable { nettyhelper.setnettyloggerfactory(); bootstrap = new clientbootstrap(channelfactory); bootstrap.setoption("keepalive", true); bootstrap.setoption("tcpnodelay", true); bootstrap.setoption("connecttimeoutmillis", gettimeout()); final nettyhandler nettyhandler = new nettyhandler(geturl(), this); bootstrap.setpipelinefactory(new channelpipelinefactory() { public channelpipeline getpipeline() { nettycodecadapter adapter = new nettycodecadapter(getcodec(), geturl(), nettyclient.this); channelpipeline pipeline = channels.pipeline(); pipeline.addlast("decoder", adapter.getdecoder()); pipeline.addlast("encoder", adapter.getencoder()); pipeline.addlast("handler", nettyhandler); return pipeline; } }); }
connect 方法用来连接提供者:
protected void doconnect() throws throwable { long start = system.currenttimemillis(); channelfuture future = bootstrap.connect(getconnectaddress()); boolean ret = future.awaituninterruptibly(getconnecttimeout(), timeunit.milliseconds); if (ret && future.issuccess()) { channel newchannel = future.getchannel(); newchannel.setinterestops(channel.op_read_write); } }
上面的代码中,调用了 bootstrap 的 connect 方法,熟悉的 netty 连接操作。当然这里使用的是 jboss 的 netty3,稍微有点区别。点击这篇:教你用 netty 实现一个简单的 rpc。当连接成功后,注册写事件,准备开始向提供者传递数据。
当 main 方法中调用 demoservice.sayhello(“world”) 的时候,最终会调用 headerexchangechannel 的 request 方法,通过 channel 进行请求。
public responsefuture request(object request, int timeout) throws remotingexception { request req = new request(); req.setversion("2.0.0"); req.settwoway(true); req.setdata(request); defaultfuture future = new defaultfuture(channel, req, timeout); channel.send(req); return future; }
send 方法中最后调用 jboss netty 中继承了 niosocketchannel 的 nioclientsocketchannel 的 write 方法。完成了一次数据的传输。
2. dubbo 的 provider 提供者如何使用 netty
provider demo 代码:
system.setproperty("java.net.preferipv4stack", "true"); classpathxmlapplicationcontext context = new classpathxmlapplicationcontext(new string[]{"meta-inf/spring/dubbo-demo-provider.xml"}); context.start(); system.in.read(); // press any key to exit
provider 作为被访问方,肯定是一个 server 模式的 socket。如何启动的呢?
当 spring 容器启动的时候,会调用一些扩展类的初始化方法,比如继承了 initializingbean,applicationcontextaware,applicationlistener 。
而 dubbo 创建了 servicebean 继承了一个监听器。spring 会调用他的 onapplicationevent 方法,该类有一个 export 方法,用于打开 serversocket 。
然后执行了 dubboprotocol 的 createserver 方法,然后创建了一个nettyserver 对象。nettyserver 对象的 构造方法同样是 doopen 方法和。
代码如下:
protected void doopen() throws throwable { nettyhelper.setnettyloggerfactory(); executorservice boss = executors.newcachedthreadpool(new namedthreadfactory("nettyserverboss", true)); executorservice worker = executors.newcachedthreadpool(new namedthreadfactory("nettyserverworker", true)); channelfactory channelfactory = new nioserversocketchannelfactory(boss, worker, geturl().getpositiveparameter(constants.io_threads_key, constants.default_io_threads)); bootstrap = new serverbootstrap(channelfactory); final nettyhandler nettyhandler = new nettyhandler(geturl(), this); channels = nettyhandler.getchannels(); bootstrap.setpipelinefactory(new channelpipelinefactory() { public channelpipeline getpipeline() { nettycodecadapter adapter = new nettycodecadapter(getcodec(), geturl(), nettyserver.this); channelpipeline pipeline = channels.pipeline(); pipeline.addlast("decoder", adapter.getdecoder()); pipeline.addlast("encoder", adapter.getencoder()); pipeline.addlast("handler", nettyhandler); return pipeline; } }); channel = bootstrap.bind(getbindaddress()); }
该方法中,看到了熟悉的 boss 线程,worker 线程,和 serverbootstrap,在添加了编解码 handler 之后,添加一个 nettyhandler,最后调用 bind 方法,完成绑定端口的工作。和我们使用 netty 是一摸一样。
3. 总结
可以看到,dubbo 使用 netty 还是挺简单的,消费者使用 nettyclient,提供者使用 nettyserver,provider 启动的时候,会开启端口监听,使用我们平时启动 netty 一样的方式。
而 client 在 spring getbean 的时候,会创建 client,当调用远程方法的时候,将数据通过 dubbo 协议编码发送到 nettyserver,然后 nettserver 收到数据后解码,并调用本地方法,并返回数据,完成一次完美的 rpc 调用。
好,关于 dubbo 如何使用 netty 就简短的介绍到这里。
推荐去我的博客阅读更多:
2.spring mvc、spring boot、spring cloud 系列教程
3.maven、git、eclipse、intellij idea 系列工具教程
觉得不错,别忘了点赞+转发哦!
上一篇: Jenkins之Nunit的应用