Java利用Sping框架编写RPC远程过程调用服务的教程
rpc,即 remote procedure call(远程过程调用),说得通俗一点就是:调用远程计算机上的服务,就像调用本地服务一样。
rpc 可基于 http 或 tcp 协议,web service 就是基于 http 协议的 rpc,它具有良好的跨平台性,但其性能却不如基于 tcp 协议的 rpc。会两方面会直接影响 rpc 的性能,一是传输方式,二是序列化。
众所周知,tcp 是传输层协议,http 是应用层协议,而传输层较应用层更加底层,在数据传输方面,越底层越快,因此,在一般情况下,tcp 一定比 http 快。就序列化而言,java 提供了默认的序列化方式,但在高并发的情况下,这种方式将会带来一些性能上的瓶颈,于是市面上出现了一系列优秀的序列化框架,比如:protobuf、kryo、hessian、jackson 等,它们可以取代 java 默认的序列化,从而提供更高效的性能。
为了支持高并发,传统的阻塞式 io 显然不太合适,因此我们需要异步的 io,即 nio。java 提供了 nio 的解决方案,java 7 也提供了更优秀的 nio.2 支持,用 java 实现 nio 并不是遥不可及的事情,只是需要我们熟悉 nio 的技术细节。
我们需要将服务部署在分布式环境下的不同节点上,通过服务注册的方式,让客户端来自动发现当前可用的服务,并调用这些服务。这需要一种服务注册表(service registry)的组件,让它来注册分布式环境下所有的服务地址(包括:主机名与端口号)。
应用、服务、服务注册表之间的关系见下图:
每台 server 上可发布多个 service,这些 service 共用一个 host 与 port,在分布式环境下会提供 server 共同对外提供 service。此外,为防止 service registry 出现单点故障,因此需要将其搭建为集群环境。
本文将为您揭晓开发轻量级分布式 rpc 框架的具体过程,该框架基于 tcp 协议,提供了 nio 特性,提供高效的序列化方式,同时也具备服务注册与发现的能力。
根据以上技术需求,我们可使用如下技术选型:
- spring:它是最强大的依赖注入框架,也是业界的权威标准。
- netty:它使 nio 编程更加容易,屏蔽了 java 底层的 nio 细节。
- protostuff:它基于 protobuf 序列化框架,面向 pojo,无需编写 .proto 文件。
- zookeeper:提供服务注册与发现功能,开发分布式系统的必备选择,同时它也具备天生的集群能力。
相关 maven 依赖请见最后附录。
第一步:编写服务接口
public interface helloservice { string hello(string name); }
将该接口放在独立的客户端 jar 包中,以供应用使用。
第二步:编写服务接口的实现类
@rpcservice(helloservice.class) // 指定远程接口 public class helloserviceimpl implements helloservice { @override public string hello(string name) { return "hello! " + name; } }
使用rpcservice注解定义在服务接口的实现类上,需要对该实现类指定远程接口,因为实现类可能会实现多个接口,一定要告诉框架哪个才是远程接口。
rpcservice代码如下:
@target({elementtype.type}) @retention(retentionpolicy.runtime) @component // 表明可被 spring 扫描 public @interface rpcservice { class<?> value(); }
该注解具备 spring 的component注解的特性,可被 spring 扫描。
该实现类放在服务端 jar 包中,该 jar 包还提供了一些服务端的配置文件与启动服务的引导程序。
第三步:配置服务端
服务端 spring 配置文件名为spring.xml,内容如下:
<beans ...> <context:component-scan base-package="com.xxx.rpc.sample.server"/> <context:property-placeholder location="classpath:config.properties"/> <!-- 配置服务注册组件 --> <bean id="serviceregistry" class="com.xxx.rpc.registry.serviceregistry"> <constructor-arg name="registryaddress" value="${registry.address}"/> </bean> <!-- 配置 rpc 服务器 --> <bean id="rpcserver" class="com.xxx.rpc.server.rpcserver"> <constructor-arg name="serveraddress" value="${server.address}"/> <constructor-arg name="serviceregistry" ref="serviceregistry"/> </bean> </beans>
具体的配置参数在config.properties文件中,内容如下:
# zookeeper 服务器 registry.address=127.0.0.1:2181 # rpc 服务器 server.address=127.0.0.1:8000
以上配置表明:连接本地的 zookeeper 服务器,并在 8000 端口上发布 rpc 服务。
第四步:启动服务器并发布服务
为了加载 spring 配置文件来发布服务,只需编写一个引导程序即可:
public class rpcbootstrap { public static void main(string[] args) { new classpathxmlapplicationcontext("spring.xml"); } }
运行rpcbootstrap类的main方法即可启动服务端,但还有两个重要的组件尚未实现,它们分别是:serviceregistry与rpcserver,下文会给出具体实现细节。
第五步:实现服务注册
使用 zookeeper 客户端可轻松实现服务注册功能,serviceregistry代码如下:
public class serviceregistry { private static final logger logger = loggerfactory.getlogger(serviceregistry.class); private countdownlatch latch = new countdownlatch(1); private string registryaddress; public serviceregistry(string registryaddress) { this.registryaddress = registryaddress; } public void register(string data) { if (data != null) { zookeeper zk = connectserver(); if (zk != null) { createnode(zk, data); } } } private zookeeper connectserver() { zookeeper zk = null; try { zk = new zookeeper(registryaddress, constant.zk_session_timeout, new watcher() { @override public void process(watchedevent event) { if (event.getstate() == event.keeperstate.syncconnected) { latch.countdown(); } } }); latch.await(); } catch (ioexception | interruptedexception e) { logger.error("", e); } return zk; } private void createnode(zookeeper zk, string data) { try { byte[] bytes = data.getbytes(); string path = zk.create(constant.zk_data_path, bytes, zoodefs.ids.open_acl_unsafe, createmode.ephemeral_sequential); logger.debug("create zookeeper node ({} => {})", path, data); } catch (keeperexception | interruptedexception e) { logger.error("", e); } } }
其中,通过constant配置了所有的常量:
public interface constant { int zk_session_timeout = 5000; string zk_registry_path = "/registry"; string zk_data_path = zk_registry_path + "/data"; }
注意:首先需要使用 zookeeper 客户端命令行创建/registry永久节点,用于存放所有的服务临时节点。
第六步:实现 rpc 服务器
使用 netty 可实现一个支持 nio 的 rpc 服务器,需要使用serviceregistry注册服务地址,rpcserver代码如下:
public class rpcserver implements applicationcontextaware, initializingbean { private static final logger logger = loggerfactory.getlogger(rpcserver.class); private string serveraddress; private serviceregistry serviceregistry; private map<string, object> handlermap = new hashmap<>(); // 存放接口名与服务对象之间的映射关系 public rpcserver(string serveraddress) { this.serveraddress = serveraddress; } public rpcserver(string serveraddress, serviceregistry serviceregistry) { this.serveraddress = serveraddress; this.serviceregistry = serviceregistry; } @override public void setapplicationcontext(applicationcontext ctx) throws beansexception { map<string, object> servicebeanmap = ctx.getbeanswithannotation(rpcservice.class); // 获取所有带有 rpcservice 注解的 spring bean if (maputils.isnotempty(servicebeanmap)) { for (object servicebean : servicebeanmap.values()) { string interfacename = servicebean.getclass().getannotation(rpcservice.class).value().getname(); handlermap.put(interfacename, servicebean); } } } @override public void afterpropertiesset() throws exception { eventloopgroup bossgroup = new nioeventloopgroup(); eventloopgroup workergroup = new nioeventloopgroup(); try { serverbootstrap bootstrap = new serverbootstrap(); bootstrap.group(bossgroup, workergroup).channel(nioserversocketchannel.class) .childhandler(new channelinitializer<socketchannel>() { @override public void initchannel(socketchannel channel) throws exception { channel.pipeline() .addlast(new rpcdecoder(rpcrequest.class)) // 将 rpc 请求进行解码(为了处理请求) .addlast(new rpcencoder(rpcresponse.class)) // 将 rpc 响应进行编码(为了返回响应) .addlast(new rpchandler(handlermap)); // 处理 rpc 请求 } }) .option(channeloption.so_backlog, 128) .childoption(channeloption.so_keepalive, true); string[] array = serveraddress.split(":"); string host = array[0]; int port = integer.parseint(array[1]); channelfuture future = bootstrap.bind(host, port).sync(); logger.debug("server started on port {}", port); if (serviceregistry != null) { serviceregistry.register(serveraddress); // 注册服务地址 } future.channel().closefuture().sync(); } finally { workergroup.shutdowngracefully(); bossgroup.shutdowngracefully(); } } }
以上代码中,有两个重要的 pojo 需要描述一下,它们分别是rpcrequest与rpcresponse。
使用rpcrequest封装 rpc 请求,代码如下:
public class rpcrequest { private string requestid; private string classname; private string methodname; private class<?>[] parametertypes; private object[] parameters; // getter/setter... }
使用rpcresponse封装 rpc 响应,代码如下:
public class rpcresponse { private string requestid; private throwable error; private object result; // getter/setter... }
使用rpcdecoder提供 rpc 解码,只需扩展 netty 的bytetomessagedecoder抽象类的decode方法即可,代码如下:
public class rpcdecoder extends bytetomessagedecoder { private class<?> genericclass; public rpcdecoder(class<?> genericclass) { this.genericclass = genericclass; } @override public void decode(channelhandlercontext ctx, bytebuf in, list<object> out) throws exception { if (in.readablebytes() < 4) { return; } in.markreaderindex(); int datalength = in.readint(); if (datalength < 0) { ctx.close(); } if (in.readablebytes() < datalength) { in.resetreaderindex(); return; } byte[] data = new byte[datalength]; in.readbytes(data); object obj = serializationutil.deserialize(data, genericclass); out.add(obj); } }
使用rpcencoder提供 rpc 编码,只需扩展 netty 的messagetobyteencoder抽象类的encode方法即可,代码如下:
public class rpcencoder extends messagetobyteencoder { private class<?> genericclass; public rpcencoder(class<?> genericclass) { this.genericclass = genericclass; } @override public void encode(channelhandlercontext ctx, object in, bytebuf out) throws exception { if (genericclass.isinstance(in)) { byte[] data = serializationutil.serialize(in); out.writeint(data.length); out.writebytes(data); } } }
编写一个serializationutil工具类,使用protostuff实现序列化:
public class serializationutil { private static map<class<?>, schema<?>> cachedschema = new concurrenthashmap<>(); private static objenesis objenesis = new objenesisstd(true); private serializationutil() { } @suppresswarnings("unchecked") private static <t> schema<t> getschema(class<t> cls) { schema<t> schema = (schema<t>) cachedschema.get(cls); if (schema == null) { schema = runtimeschema.createfrom(cls); if (schema != null) { cachedschema.put(cls, schema); } } return schema; } @suppresswarnings("unchecked") public static <t> byte[] serialize(t obj) { class<t> cls = (class<t>) obj.getclass(); linkedbuffer buffer = linkedbuffer.allocate(linkedbuffer.default_buffer_size); try { schema<t> schema = getschema(cls); return protostuffioutil.tobytearray(obj, schema, buffer); } catch (exception e) { throw new illegalstateexception(e.getmessage(), e); } finally { buffer.clear(); } } public static <t> t deserialize(byte[] data, class<t> cls) { try { t message = (t) objenesis.newinstance(cls); schema<t> schema = getschema(cls); protostuffioutil.mergefrom(data, message, schema); return message; } catch (exception e) { throw new illegalstateexception(e.getmessage(), e); } } }
以上了使用 objenesis 来实例化对象,它是比 java 反射更加强大。
注意:如需要替换其它序列化框架,只需修改serializationutil即可。当然,更好的实现方式是提供配置项来决定使用哪种序列化方式。
使用rpchandler中处理 rpc 请求,只需扩展 netty 的simplechannelinboundhandler抽象类即可,代码如下:
public class rpchandler extends simplechannelinboundhandler<rpcrequest> { private static final logger logger = loggerfactory.getlogger(rpchandler.class); private final map<string, object> handlermap; public rpchandler(map<string, object> handlermap) { this.handlermap = handlermap; } @override public void channelread0(final channelhandlercontext ctx, rpcrequest request) throws exception { rpcresponse response = new rpcresponse(); response.setrequestid(request.getrequestid()); try { object result = handle(request); response.setresult(result); } catch (throwable t) { response.seterror(t); } ctx.writeandflush(response).addlistener(channelfuturelistener.close); } private object handle(rpcrequest request) throws throwable { string classname = request.getclassname(); object servicebean = handlermap.get(classname); class<?> serviceclass = servicebean.getclass(); string methodname = request.getmethodname(); class<?>[] parametertypes = request.getparametertypes(); object[] parameters = request.getparameters(); /*method method = serviceclass.getmethod(methodname, parametertypes); method.setaccessible(true); return method.invoke(servicebean, parameters);*/ fastclass servicefastclass = fastclass.create(serviceclass); fastmethod servicefastmethod = servicefastclass.getmethod(methodname, parametertypes); return servicefastmethod.invoke(servicebean, parameters); } @override public void exceptioncaught(channelhandlercontext ctx, throwable cause) { logger.error("server caught exception", cause); ctx.close(); } }
为了避免使用 java 反射带来的性能问题,我们可以使用 cglib 提供的反射 api,如上面用到的fastclass与fastmethod。
第七步:配置客户端
同样使用 spring 配置文件来配置 rpc 客户端,spring.xml代码如下:
<beans ...> <context:property-placeholder location="classpath:config.properties"/> <!-- 配置服务发现组件 --> <bean id="servicediscovery" class="com.xxx.rpc.registry.servicediscovery"> <constructor-arg name="registryaddress" value="${registry.address}"/> </bean> <!-- 配置 rpc 代理 --> <bean id="rpcproxy" class="com.xxx.rpc.client.rpcproxy"> <constructor-arg name="servicediscovery" ref="servicediscovery"/> </bean> </beans>
其中config.properties提供了具体的配置:
# zookeeper 服务器 registry.address=127.0.0.1:2181
第八步:实现服务发现
同样使用 zookeeper 实现服务发现功能,见如下代码:
public class servicediscovery { private static final logger logger = loggerfactory.getlogger(servicediscovery.class); private countdownlatch latch = new countdownlatch(1); private volatile list<string> datalist = new arraylist<>(); private string registryaddress; public servicediscovery(string registryaddress) { this.registryaddress = registryaddress; zookeeper zk = connectserver(); if (zk != null) { watchnode(zk); } } public string discover() { string data = null; int size = datalist.size(); if (size > 0) { if (size == 1) { data = datalist.get(0); logger.debug("using only data: {}", data); } else { data = datalist.get(threadlocalrandom.current().nextint(size)); logger.debug("using random data: {}", data); } } return data; } private zookeeper connectserver() { zookeeper zk = null; try { zk = new zookeeper(registryaddress, constant.zk_session_timeout, new watcher() { @override public void process(watchedevent event) { if (event.getstate() == event.keeperstate.syncconnected) { latch.countdown(); } } }); latch.await(); } catch (ioexception | interruptedexception e) { logger.error("", e); } return zk; } private void watchnode(final zookeeper zk) { try { list<string> nodelist = zk.getchildren(constant.zk_registry_path, new watcher() { @override public void process(watchedevent event) { if (event.gettype() == event.eventtype.nodechildrenchanged) { watchnode(zk); } } }); list<string> datalist = new arraylist<>(); for (string node : nodelist) { byte[] bytes = zk.getdata(constant.zk_registry_path + "/" + node, false, null); datalist.add(new string(bytes)); } logger.debug("node data: {}", datalist); this.datalist = datalist; } catch (keeperexception | interruptedexception e) { logger.error("", e); } } }
第九步:实现 rpc 代理
这里使用 java 提供的动态代理技术实现 rpc 代理(当然也可以使用 cglib 来实现),具体代码如下:
public class rpcproxy { private string serveraddress; private servicediscovery servicediscovery; public rpcproxy(string serveraddress) { this.serveraddress = serveraddress; } public rpcproxy(servicediscovery servicediscovery) { this.servicediscovery = servicediscovery; } @suppresswarnings("unchecked") public <t> t create(class<?> interfaceclass) { return (t) proxy.newproxyinstance( interfaceclass.getclassloader(), new class<?>[]{interfaceclass}, new invocationhandler() { @override public object invoke(object proxy, method method, object[] args) throws throwable { rpcrequest request = new rpcrequest(); // 创建并初始化 rpc 请求 request.setrequestid(uuid.randomuuid().tostring()); request.setclassname(method.getdeclaringclass().getname()); request.setmethodname(method.getname()); request.setparametertypes(method.getparametertypes()); request.setparameters(args); if (servicediscovery != null) { serveraddress = servicediscovery.discover(); // 发现服务 } string[] array = serveraddress.split(":"); string host = array[0]; int port = integer.parseint(array[1]); rpcclient client = new rpcclient(host, port); // 初始化 rpc 客户端 rpcresponse response = client.send(request); // 通过 rpc 客户端发送 rpc 请求并获取 rpc 响应 if (response.iserror()) { throw response.geterror(); } else { return response.getresult(); } } } ); } }
使用rpcclient类实现 rpc 客户端,只需扩展 netty 提供的simplechannelinboundhandler抽象类即可,代码如下:
public class rpcclient extends simplechannelinboundhandler<rpcresponse> { private static final logger logger = loggerfactory.getlogger(rpcclient.class); private string host; private int port; private rpcresponse response; private final object obj = new object(); public rpcclient(string host, int port) { this.host = host; this.port = port; } @override public void channelread0(channelhandlercontext ctx, rpcresponse response) throws exception { this.response = response; synchronized (obj) { obj.notifyall(); // 收到响应,唤醒线程 } } @override public void exceptioncaught(channelhandlercontext ctx, throwable cause) throws exception { logger.error("client caught exception", cause); ctx.close(); } public rpcresponse send(rpcrequest request) throws exception { eventloopgroup group = new nioeventloopgroup(); try { bootstrap bootstrap = new bootstrap(); bootstrap.group(group).channel(niosocketchannel.class) .handler(new channelinitializer<socketchannel>() { @override public void initchannel(socketchannel channel) throws exception { channel.pipeline() .addlast(new rpcencoder(rpcrequest.class)) // 将 rpc 请求进行编码(为了发送请求) .addlast(new rpcdecoder(rpcresponse.class)) // 将 rpc 响应进行解码(为了处理响应) .addlast(rpcclient.this); // 使用 rpcclient 发送 rpc 请求 } }) .option(channeloption.so_keepalive, true); channelfuture future = bootstrap.connect(host, port).sync(); future.channel().writeandflush(request).sync(); synchronized (obj) { obj.wait(); // 未收到响应,使线程等待 } if (response != null) { future.channel().closefuture().sync(); } return response; } finally { group.shutdowngracefully(); } } }
第十步:发送 rpc 请求
使用 junit 结合 spring 编写一个单元测试,代码如下:
@runwith(springjunit4classrunner.class) @contextconfiguration(locations = "classpath:spring.xml") public class helloservicetest { @autowired private rpcproxy rpcproxy; @test public void hellotest() { helloservice helloservice = rpcproxy.create(helloservice.class); string result = helloservice.hello("world"); assert.assertequals("hello! world", result); } }
运行以上单元测试,如果不出意外的话,您应该会看到绿条。
总结
本文通过 spring + netty + protostuff + zookeeper 实现了一个轻量级 rpc 框架,使用 spring 提供依赖注入与参数配置,使用 netty 实现 nio 方式的数据传输,使用 protostuff 实现对象序列化,使用 zookeeper 实现服务注册与发现。使用该框架,可将服务部署到分布式环境中的任意节点上,客户端通过远程接口来调用服务端的具体实现,让服务端与客户端的开发完全分离,为实现大规模分布式应用提供了基础支持。
附录:maven 依赖
<!-- junit --> <dependency> <groupid>junit</groupid> <artifactid>junit</artifactid> <version>4.11</version> <scope>test</scope> </dependency> <!-- slf4j --> <dependency> <groupid>org.slf4j</groupid> <artifactid>slf4j-log4j12</artifactid> <version>1.7.7</version> </dependency> <!-- spring --> <dependency> <groupid>org.springframework</groupid> <artifactid>spring-context</artifactid> <version>3.2.12.release</version> </dependency> <dependency> <groupid>org.springframework</groupid> <artifactid>spring-test</artifactid> <version>3.2.12.release</version> <scope>test</scope> </dependency> <!-- netty --> <dependency> <groupid>io.netty</groupid> <artifactid>netty-all</artifactid> <version>4.0.24.final</version> </dependency> <!-- protostuff --> <dependency> <groupid>com.dyuproject.protostuff</groupid> <artifactid>protostuff-core</artifactid> <version>1.0.8</version> </dependency> <dependency> <groupid>com.dyuproject.protostuff</groupid> <artifactid>protostuff-runtime</artifactid> <version>1.0.8</version> </dependency> <!-- zookeeper --> <dependency> <groupid>org.apache.zookeeper</groupid> <artifactid>zookeeper</artifactid> <version>3.4.6</version> </dependency> <!-- apache commons collections --> <dependency> <groupid>org.apache.commons</groupid> <artifactid>commons-collections4</artifactid> <version>4.0</version> </dependency> <!-- objenesis --> <dependency> <groupid>org.objenesis</groupid> <artifactid>objenesis</artifactid> <version>2.1</version> </dependency> <!-- cglib --> <dependency> <groupid>cglib</groupid> <artifactid>cglib</artifactid> <version>3.1</version> </dependency>