基于netty实现rpc框架-spring boot服务端
demo地址
rpc介绍
首先了解一下rpc:远程过程调用。简单点说就是本地应用可以调用远程服务器的接口。那么通过什么方式调用远程接口呢?说白了rpc只是一种概念。他的调用可以基于http实现,也可以基于tcp/ip实现。甚至私人定制的通讯协议。
当然,私人定制通讯协议成本过高且不具备通用性。我们不做展开讨论(其实我也展不开。。。)。那为什么不使用http协议呢?受限于http协议层级过高,数据传输效率不如tcp/ip。所以rpc远程调用一般采用tcp/ip实现。即调用socket方法。
rpc实现原理
1. 客户端发起远程服务调用。
2. 客户端将类信息、调用方法和入参信息通过socket通道发送给服务端。
3. 服务端解析数据包,调用本地接口。
5.将执行结果通过socket返回给客户端。
6.客户端拿到并解析返回结果。
rpc实现
java如何实现一个rpc框架,其实就是按照上面的原理再做一些详细的补充。比如通过动态代理封装客户端的数据包、通过反射机制实现服务端实现类的调用等等。
今天,我们先基于spring boot + netty 做rpc服务端的实现。
首先,做一个注解用于标识接口提供rpc调用。
@target(elementtype.type)
@retention(retentionpolicy.runtime)
public @interface service {
string name() default "";
}
该注解用于提供服务的实现类上。
public interface inettyservice {
string getstring();
}
其实现类:
package com.braska.grave.netty.server.service;
@service // 该注解为自定义rpc服务注解
public class nettyservice implements inettyservice {
@override
public string getstring() {
return "welcome to use netty rpc.";
}
}
接着,定义一个注解用来扫描指定包名下的service注解。
@retention(retentionpolicy.runtime)
@target({elementtype.type})
@documented
@import({nettyserverscannerregistrar.class, nettyserverapplicationcontextaware.class})
public @interface nettyserverscan {
string[] basepackages();
}
该注解用于spring boot启动类上,参数basepackages指定服务所在的包路径。
@springbootapplication
@nettyserverscan(basepackages = {
"com.braska.grave.netty.server.service"
})
public class gravenettyserverapplication {
public static void main(string[] args) {
springapplication.run(gravenettyserverapplication.class, args);
}
}
nettyserverscannerregistrar类处理服务的spring bean注册。
public class nettyserverscannerregistrar implements beanfactoryaware, importbeandefinitionregistrar, resourceloaderaware {
@override
public void registerbeandefinitions(annotationmetadata importingclassmetadata, beandefinitionregistry registry) {
// 创建扫描器实例
nettyserverinterfacescanner scanner = new nettyserverinterfacescanner(registry);
if (this.resourceloader != null) {
scanner.setresourceloader(this.resourceloader);
}
annotationattributes annoattrs =
annotationattributes.frommap(importingclassmetadata.getannotationattributes(nettyserverscan.class.getname()));
list<string> basepackages = new arraylist<string>();
for (string pkg : annoattrs.getstringarray("basepackages")) {
if (stringutils.hastext(pkg)) {
basepackages.add(pkg);
}
}
// 只扫描指定的注解。
scanner.setannotationclass(service.class);
scanner.registerfilters();
// 将basepackages里面的通过@service注解的类注册成spring bean。
scanner.doscan(stringutils.tostringarray(basepackages));
}
}
nettyserverapplicationcontextaware类,暴露socket server端口。
public class nettyserverapplicationcontextaware implements applicationcontextaware, initializingbean {
private static final logger logger = logger.getlogger(nettyserverapplicationcontextaware.class.getname());
// 存储接口与实现类的映射,其中key是接口名。value是实现类的bean。
private map<string, object> servicemap = new hashmap<>();
// 服务worker。包含netty socket服务端生命周期及读写。
serverworker runner;
@override
public void setapplicationcontext(applicationcontext applicationcontext) throws beansexception {
string address = applicationcontext.getenvironment().getproperty("remoteaddress");
map<string, object> beans = applicationcontext.getbeanswithannotation(service.class);
for (object servicebean : beans.values()) {
class<?> clazz = servicebean.getclass();
class<?>[] interfaces = clazz.getinterfaces();
for (class<?> inter : interfaces) {
string interfacename = inter.getname();
servicemap.put(interfacename, servicebean);
}
}
// 创建netty worker对象
runner = new serverworker(address, servicemap);
}
@override
public void afterpropertiesset() throws exception {
// 创建netty socketserver及通道处理器
runner.open();
}
}
serverworker类的open方法。
public class serverworker extends channelinitializer {
// socket ip:port private string remoteaddress;
// 实现类的beanmap private map<string, object> servicemap;
// netty channel处理器 nettyserverhandler handler;public void open() { try { int parallel = runtime.getruntime().availableprocessors() * 2; serverbootstrap bootstrap = new serverbootstrap(); this.bossgroup = new nioeventloopgroup(); // todo 使用线程池,提高并发能力 this.workergroup = new nioeventloopgroup(parallel); bootstrap.group(bossgroup, workergroup) .channel(nioserversocketchannel.class) .option(channeloption.so_backlog, 1024) .childoption(channeloption.so_keepalive, true) .childoption(channeloption.tcp_nodelay, true) .childhandler(this); string[] hostandport = this.remoteaddress.split(":"); if (hostandport == null || hostandport.length != 2) { throw new runtimeexception("remoteaddress is error."); } channelfuture cf = bootstrap.bind(hostandport[0], integer.parseint(hostandport[1])).sync(); // todo 信息写入注册中心 // registry.register(serveraddress); logger.info("netty 服务器启动.监听端口:" + hostandport[1]); // 等待服务端监听端口关闭 cf.channel().closefuture().sync(); } catch (exception e) { logger.log(level.severe, "netty server open failed.", e); this.bossgroup.shutdowngracefully(); this.workergroup.shutdowngracefully(); } } @override protected void initchannel(channel channel) throws exception { channelpipeline pipeline = channel.pipeline(); pipeline.addlast(new idlestatehandler(0, 0, 60)); pipeline.addlast(new jsonencoder()); pipeline.addlast(new jsondecoder()); pipeline.addlast(this.handler); } }
nettyserverhandler服务端channel处理器,继承channelinboundhandleradapter。
@channelhandler.sharable
public class nettyserverhandler extends channelinboundhandleradapter {
private map<string, object> servicemap;
public nettyserverhandler(map<string, object> servicemap) {
this.servicemap = servicemap;
}
@override
public void channelread(channelhandlercontext ctx, object msg) {
// 解析客户端发送过来的数据。包含类名、方法名、入参等信息。
request request = json.parseobject(msg.tostring(), request.class);
response response = new response();
response.setrequestid(request.getid());
try {
// 调用本地实现类
object res = this.handler(request);
response.setdata(res);
} catch (exception e) {
response.setcode(-1);
response.seterror(e.getmessage());
logger.log(level.severe, "请求调用失败", e);
}
// 返回处理结果给客户端
ctx.writeandflush(response);
}
private object handler(request request) throws exception {
string classname = request.getclassname();
// 通过classname从beanmap映射中找到托管给spring的bean实现类。
object servicebean = servicemap.get(classname);
string methodname = request.getmethodname();
object[] parameters = request.getparameters();
// 通过反射机制调用实现类。并返回调用结果。
return methodutils.invokemethod(servicebean, methodname, parameters);
}
}
至此,rpc服务端的实现就完成了。
一路看下来,服务端的代码实现还是比较简单的。核心代码只有两个类:serverworker和nettyserverhandler。其余的都是对spring bean注册的支持。