Springboot整合Netty实现RPC服务器的示例代码
一、什么是rpc?
rpc(remote procedure call)远程过程调用,是一种进程间的通信方式,其可以做到像调用本地方法那样调用位于远程的计算机的服务。其实现的原理过程如下:
- 本地的进程通过接口进行本地方法调用。
- rpc客户端将调用的接口名、接口方法、方法参数等信息利用网络通信发送给rpc服务器。
- rpc服务器对请求进行解析,根据接口名、接口方法、方法参数等信息找到对应的方法实现,并进行本地方法调用,然后将方法调用结果响应给rpc客户端。
二、实现rpc需要解决那些问题?
1. 约定通信协议格式
rpc分为客户端与服务端,就像http一样,我们需要定义交互的协议格式。主要包括三个方面:
- 请求格式
- 响应格式
- 网络通信时数据的序列化方式
rpc请求
@data public class rpcrequest { /** * 请求id 用来标识本次请求以匹配rpc服务器的响应 */ private string requestid; /** * 调用的类(接口)权限定名称 */ private string classname; /** * 调用的方法名 */ private string methodname; /** * 方法参类型列表 */ private class<?>[] parametertypes; /** * 方法参数 */ private object[] parameters; }
rpc响应
@data public class rpcresponse { /** * 响应对应的请求id */ private string requestid; /** * 调用是否成功的标识 */ private boolean success = true; /** * 调用错误信息 */ private string errormessage; /** * 调用结果 */ private object result; }
2. 序列化方式
序列化方式可以使用jdk自带的序列化方式或者一些第三方的序列化方式,jdk自带的由于性能较差所以不推荐。我们这里选择json作为序列化协议,即将请求和响应对象序列化为json字符串后发送到对端,对端接收到后反序列为相应的对象,这里采用阿里的 fastjson 作为json序列化框架。
3. tcp粘包、拆包
tcp是个“流”协议,所谓流,就是没有界限的一串数据。大家可以想想河里的流水,是连成一片的,其间并没有分界线。tcp底层并不了解上层业务数据的具体含义,它会根据tcp缓冲区的实际情况进行包的划分,所以在业务上认为,一个完整的包可能会被tcp拆分成多个包进行发送,也有可能把多个小的包封装成一个大的数据包发送,这就是所谓的tcp粘包和拆包问题。粘包和拆包需要应用层程序来解决。
我们采用在请求和响应的头部保存消息体的长度的方式解决粘包和拆包问题。请求和响应的格式如下:
+--------+----------------+ | length | content | | 4字节 | length个字节 | +--------+----------------+
4. 网络通信框架的选择
出于性能的考虑,rpc一般选择异步非阻塞的网络通信方式,jdk自带的nio网络编程操作繁杂,netty是一款基于nio开发的网络通信框架,其对java nio进行封装对外提供友好的api,并且内置了很多开箱即用的组件,如各种编码解码器。所以我们采用netty作为rpc服务的网络通信框架。
三、rpc服务端
rpc分为客户端和服务端,它们有一个共同的服务接口api,我们首先定义一个接口 helloservice
public interface helloservice { string sayhello(string name); }
然后服务端需要提供该接口的实现类,然后使用自定义的@rpcservice注解标注,该注解扩展自@component,被其标注的类可以被spring的容器管理。
@rpcservice public class helloserviceimp implements helloservice { @override public string sayhello(string name) { return "hello " + name; } }
@target(elementtype.type) @retention(retentionpolicy.runtime) @component public @interface rpcservice { }
rpc服务器类
我们实现了applicationcontextaware接口,以便从bean容器中取出@rpcservice实现类,存入我们的map容器中。
@component @slf4j public class rpcserver implements applicationcontextaware, initializingbean { // rpc服务实现容器 private map<string, object> rpcservices = new hashmap<>(); @value("${rpc.server.port}") private int port; @override public void setapplicationcontext(applicationcontext applicationcontext) throws beansexception { map<string, object> services = applicationcontext.getbeanswithannotation(rpcservice.class); for (map.entry<string, object> entry : services.entryset()) { object bean = entry.getvalue(); class<?>[] interfaces = bean.getclass().getinterfaces(); for (class<?> inter : interfaces) { rpcservices.put(inter.getname(), bean); } } log.info("加载rpc服务数量:{}", rpcservices.size()); } @override public void afterpropertiesset() { start(); } private void start(){ new thread(() -> { eventloopgroup boss = new nioeventloopgroup(1); eventloopgroup worker = new nioeventloopgroup(); try { serverbootstrap bootstrap = new serverbootstrap(); bootstrap.group(boss, worker) .childhandler(new channelinitializer<socketchannel>() { @override protected void initchannel(socketchannel ch) throws exception { channelpipeline pipeline = ch.pipeline(); pipeline.addlast(new idlestatehandler(0, 0, 60)); pipeline.addlast(new jsondecoder()); pipeline.addlast(new jsonencoder()); pipeline.addlast(new rpcinboundhandler(rpcservices)); } }) .channel(nioserversocketchannel.class); channelfuture future = bootstrap.bind(port).sync(); log.info("rpc 服务器启动, 监听端口:" + port); future.channel().closefuture().sync(); }catch (exception e){ e.printstacktrace(); boss.shutdowngracefully(); worker.shutdowngracefully(); } }).start(); } }
rpcserverinboundhandler 负责处理rpc请求
@slf4j public class rpcserverinboundhandler extends channelinboundhandleradapter { private map<string, object> rpcservices; public rpcserverinboundhandler(map<string, object> rpcservices){ this.rpcservices = rpcservices; } @override public void channelactive(channelhandlercontext ctx) throws exception { log.info("客户端连接成功,{}", ctx.channel().remoteaddress()); } public void channelinactive(channelhandlercontext ctx) { log.info("客户端断开连接,{}", ctx.channel().remoteaddress()); ctx.channel().close(); } @override public void channelread(channelhandlercontext ctx, object msg){ rpcrequest rpcrequest = (rpcrequest) msg; log.info("接收到客户端请求, 请求接口:{}, 请求方法:{}", rpcrequest.getclassname(), rpcrequest.getmethodname()); rpcresponse response = new rpcresponse(); response.setrequestid(rpcrequest.getrequestid()); object result = null; try { result = this.handlerequest(rpcrequest); response.setresult(result); } catch (exception e) { e.printstacktrace(); response.setsuccess(false); response.seterrormessage(e.getmessage()); } log.info("服务器响应:{}", response); ctx.writeandflush(response); } @override public void exceptioncaught(channelhandlercontext ctx, throwable cause) throws exception { log.info("连接异常"); ctx.channel().close(); super.exceptioncaught(ctx, cause); } @override public void usereventtriggered(channelhandlercontext ctx, object evt) throws exception { if (evt instanceof idlestateevent){ idlestateevent event = (idlestateevent)evt; if (event.state()== idlestate.all_idle){ log.info("客户端已超过60秒未读写数据, 关闭连接.{}",ctx.channel().remoteaddress()); ctx.channel().close(); } }else{ super.usereventtriggered(ctx,evt); } } private object handlerequest(rpcrequest rpcrequest) throws exception{ object bean = rpcservices.get(rpcrequest.getclassname()); if(bean == null){ throw new runtimeexception("未找到对应的服务: " + rpcrequest.getclassname()); } method method = bean.getclass().getmethod(rpcrequest.getmethodname(), rpcrequest.getparametertypes()); method.setaccessible(true); return method.invoke(bean, rpcrequest.getparameters()); } }
四、rpc客户端
/** * rpc远程调用的客户端 */ @slf4j @component public class rpcclient { @value("${rpc.remote.ip}") private string remoteip; @value("${rpc.remote.port}") private int port; private bootstrap bootstrap; // 储存调用结果 private final map<string, synchronousqueue<rpcresponse>> results = new concurrenthashmap<>(); public rpcclient(){ } @postconstruct public void init(){ bootstrap = new bootstrap().remoteaddress(remoteip, port); nioeventloopgroup worker = new nioeventloopgroup(1); bootstrap.group(worker) .channel(niosocketchannel.class) .handler(new channelinitializer<socketchannel>() { @override protected void initchannel(socketchannel channel) throws exception { channelpipeline pipeline = channel.pipeline(); pipeline.addlast(new idlestatehandler(0, 0, 10)); pipeline.addlast(new jsonencoder()); pipeline.addlast(new jsondecoder()); pipeline.addlast(new rpcclientinboundhandler(results)); } }); } public rpcresponse send(rpcrequest rpcrequest) { rpcresponse rpcresponse = null; rpcrequest.setrequestid(uuid.randomuuid().tostring()); channel channel = null; try { channel = bootstrap.connect().sync().channel(); log.info("连接建立, 发送请求:{}", rpcrequest); channel.writeandflush(rpcrequest); synchronousqueue<rpcresponse> queue = new synchronousqueue<>(); results.put(rpcrequest.getrequestid(), queue); // 阻塞等待获取响应 rpcresponse = queue.take(); results.remove(rpcrequest.getrequestid()); } catch (interruptedexception e) { e.printstacktrace(); } finally { if(channel != null && channel.isactive()){ channel.close(); } } return rpcresponse; } }
rpcclientinboundhandler负责处理服务端的响应
@slf4j public class rpcclientinboundhandler extends channelinboundhandleradapter { private map<string, synchronousqueue<rpcresponse>> results; public rpcclientinboundhandler(map<string, synchronousqueue<rpcresponse>> results){ this.results = results; } @override public void channelread(channelhandlercontext ctx, object msg) throws exception { rpcresponse rpcresponse = (rpcresponse) msg; log.info("收到服务器响应:{}", rpcresponse); if(!rpcresponse.issuccess()){ throw new runtimeexception("调用结果异常,异常信息:" + rpcresponse.geterrormessage()); } // 取出结果容器,将response放进queue中 synchronousqueue<rpcresponse> queue = results.get(rpcresponse.getrequestid()); queue.put(rpcresponse); } @override public void usereventtriggered(channelhandlercontext ctx, object evt) throws exception { if (evt instanceof idlestateevent){ idlestateevent event = (idlestateevent)evt; if (event.state() == idlestate.all_idle){ log.info("发送心跳包"); rpcrequest request = new rpcrequest(); request.setmethodname("heartbeat"); ctx.channel().writeandflush(request); } }else{ super.usereventtriggered(ctx, evt); } } @override public void exceptioncaught(channelhandlercontext ctx, throwable cause){ log.info("异常:{}", cause.getmessage()); ctx.channel().close(); } }
接口代理
为了使客户端像调用本地方法一样调用远程服务,我们需要对接口进行动态代理。
代理类实现
@component public class rpcproxy implements invocationhandler { @autowired private rpcclient rpcclient; @override public object invoke(object proxy, method method, object[] args){ rpcrequest rpcrequest = new rpcrequest(); rpcrequest.setclassname(method.getdeclaringclass().getname()); rpcrequest.setmethodname(method.getname()); rpcrequest.setparameters(args); rpcrequest.setparametertypes(method.getparametertypes()); rpcresponse rpcresponse = rpcclient.send(rpcrequest); return rpcresponse.getresult(); } }
实现factorybean接口,将生产动态代理类纳入 spring 容器管理。
public class rpcfactorybean<t> implements factorybean<t> { private class<t> interfaceclass; @autowired private rpcproxy rpcproxy; public rpcfactorybean(class<t> interfaceclass){ this.interfaceclass = interfaceclass; } @override public t getobject(){ return (t) proxy.newproxyinstance(interfaceclass.getclassloader(), new class[]{interfaceclass}, rpcproxy); } @override public class<?> getobjecttype() { return interfaceclass; } }
自定义类路径扫描器,扫描包下的rpc接口,动态生产代理类,纳入 spring 容器管理
public class rpcscanner extends classpathbeandefinitionscanner { public rpcscanner(beandefinitionregistry registry) { super(registry); } @override protected set<beandefinitionholder> doscan(string... basepackages) { set<beandefinitionholder> beandefinitionholders = super.doscan(basepackages); for (beandefinitionholder beandefinitionholder : beandefinitionholders) { genericbeandefinition beandefinition = (genericbeandefinition)beandefinitionholder.getbeandefinition(); beandefinition.getconstructorargumentvalues().addgenericargumentvalue(beandefinition.getbeanclassname()); beandefinition.setbeanclassname(rpcfactorybean.class.getname()); } return beandefinitionholders; } @override protected boolean iscandidatecomponent(metadatareader metadatareader) throws ioexception { return true; } @override protected boolean iscandidatecomponent(annotatedbeandefinition beandefinition) { return beandefinition.getmetadata().isinterface() && beandefinition.getmetadata().isindependent(); } }
@component public class rpcbeandefinitionregistrypostprocessor implements beandefinitionregistrypostprocessor { @override public void postprocessbeandefinitionregistry(beandefinitionregistry registry) throws beansexception { rpcscanner rpcscanner = new rpcscanner(registry); // 传入rpc接口所在的包名 rpcscanner.scan("com.ygd.rpc.common.service"); } @override public void postprocessbeanfactory(configurablelistablebeanfactory beanfactory) throws beansexception { } }
json编解码器
/** * 将 rpcrequest 编码成字节序列发送 * 消息格式: length + content * length使用int存储,标识消息体的长度 * * +--------+----------------+ * | length | content | * | 4字节 | length个字节 | * +--------+----------------+ */ public class jsonencoder extends messagetobyteencoder<rpcrequest> { @override protected void encode(channelhandlercontext ctx, rpcrequest rpcrequest, bytebuf out){ byte[] bytes = json.tojsonbytes(rpcrequest); // 将消息体的长度写入消息头部 out.writeint(bytes.length); // 写入消息体 out.writebytes(bytes); } }
/** * 将响应消息解码成 rpcresponse */ public class jsondecoder extends lengthfieldbasedframedecoder { public jsondecoder(){ super(integer.max_value, 0, 4, 0, 4); } @override protected object decode(channelhandlercontext ctx, bytebuf in) throws exception { bytebuf msg = (bytebuf) super.decode(ctx, in); byte[] bytes = new byte[msg.readablebytes()]; msg.readbytes(bytes); rpcresponse rpcresponse = json.parseobject(bytes, rpcresponse.class); return rpcresponse; } }
测试
我们编写一个controller进行测试
@restcontroller @requestmapping("/hello") public class hellocontroller { @autowired private helloservice helloservice; @getmapping("/sayhello") public string hello(string name){ return helloservice.sayhello(name); } }
通过 postman调用 controller 接口 http://localhost:9998/hello/sayhello?name=小明
响应: hello 小明
总结
本文实现了一个简易的、具有基本概念的rpc,主要涉及的知识点如下:
- 网络通信及通信协议的编码、解码
- java对象的序列化及反序列化
- 通信链路心跳检测
- java反射
- jdk动态代理
项目完整代码详见:
到此这篇关于springboot整合netty实现rpc服务器的示例代码的文章就介绍到这了,更多相关springboot rpc服务器内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!
上一篇: Docker 使用国内镜像仓库的方法