基于netty实现rpc框架-spring boot客户端
程序员文章站
2022-11-17 17:06:15
上篇讲了RPC服务端的实现。原理就是解析netty通道数据拿到类、方法及入参等信息,然后通过java反射机制调用本地接口返回结果。没有用到很复杂的技术。 这篇我们将客户端的实现。说白了客户端的任务很简单:一是建立socket长连接。二是封装发送服务端需要的数据包。三是处理返回结果。 demo地址 h ......
上篇讲了rpc服务端的实现。原理就是解析netty通道数据拿到类、方法及入参等信息,然后通过java反射机制调用本地接口返回结果。没有用到很复杂的技术。
这篇我们将客户端的实现。说白了客户端的任务很简单:一是建立socket长连接。二是封装发送服务端需要的数据包。三是处理返回结果。
demo地址
rpc实现
同样定义注解扫描service接口。
@retention(retentionpolicy.runtime)
@target({elementtype.type})
@documented
@import({nettyclientscannerregistrar.class, nettyclientapplicationcontextaware.class})
public @interface nettyclientscan {
string[] basepackages();
class<? extends nettyfactorybean> factorybean() default nettyfactorybean.class;
}
该注解用于spring boot启动类上,参数basepackages指定接口所在的包路径。
@springbootapplication
@nettyclientscan(basepackages = {
"com.braska.grave.netty.api.service"
})
public class gravenettyclientapplication {
public static void main(string[] args) {
springapplication.run(gravenettyclientapplication.class, args);
}
}
nettyserverscannerregistrar类注册bean。
public class nettyclientscannerregistrar implements importbeandefinitionregistrar, resourceloaderaware {
@override
public void registerbeandefinitions(annotationmetadata importingclassmetadata, beandefinitionregistry registry) {
// spring bean注册
nettyclientinterfacescanner scanner = new nettyclientinterfacescanner(registry);
annotationattributes annoattrs =
annotationattributes.frommap(importingclassmetadata.getannotationattributes(nettyclientscan.class.getname()));
class<? extends nettyfactorybean> nettyfactorybeanclass = annoattrs.getclass("factorybean");
if (!nettyfactorybean.class.equals(nettyfactorybeanclass)) {
scanner.setnettyfactorybean(beanutils.instantiateclass(nettyfactorybeanclass));
}
list<string> basepackages = new arraylist<string>();
for (string pkg : annoattrs.getstringarray("basepackages")) {
if (stringutils.hastext(pkg)) {
basepackages.add(pkg);
}
}
scanner.doscan(stringutils.tostringarray(basepackages));
}
}
nettyclientinterfacescanner类使用jdk动态代理basepackages路径下的接口。
public class nettyclientinterfacescanner extends classpathbeandefinitionscanner {
private nettyfactorybean nettyfactorybean = new nettyfactorybean();
@override
public set<beandefinitionholder> doscan(string... basepackages) {
set<beandefinitionholder> beandefinitions = super.doscan(basepackages);
if (beandefinitions.isempty()) {
} else {
processbeandefinitions(beandefinitions);
}
return beandefinitions;
}
private void processbeandefinitions(
set<beandefinitionholder> beandefinitions) {
genericbeandefinition definition;
for (beandefinitionholder holder : beandefinitions) {
definition = (genericbeandefinition) holder.getbeandefinition();
// 为对象属性赋值(这一块我也还不太明白)
definition.getconstructorargumentvalues().addgenericargumentvalue(definition.getbeanclassname());
// 这里的nettyfactorybean是生成bean实例的工厂,不是bean本身
definition.setbeanclass(this.nettyfactorybean.getclass());
definition.setautowiremode(abstractbeandefinition.autowire_by_type);
}
}
}
nettyfactorybean
public class nettyfactorybean<t> implements factorybean<t> {
private class<t> nettyinterface;
public nettyfactorybean() {}
public nettyfactorybean(class<t> nettyinterface) {
this.nettyinterface = nettyinterface;
}
@override
public t getobject() throws exception {
// 通过jdk动态代理创建实例
return (t) proxy.newproxyinstance(nettyinterface.getclassloader(), new class[]{nettyinterface}, c.getinstance());
}
@override
public class<?> getobjecttype() {
return this.nettyinterface;
}
@override
public boolean issingleton() {
return true;
}
}
关键来了,nettyinterfaceinvoker类负责数据包封装及发送。
public class nettyinterfaceinvoker implements invocationhandler {
private requestsender sender;
// 静态内部类做单例模式
private static class singleton {
private static final nettyinterfaceinvoker invoker = new nettyinterfaceinvoker();
private static nettyinterfaceinvoker setsender(requestsender sender) {
invoker.sender = sender;
return invoker;
}
}
public static nettyinterfaceinvoker getinstance() {
return singleton.invoker;
}
public static nettyinterfaceinvoker setsender(requestsender sender) {
return singleton.setsender(sender);
}
@override
public object invoke(object proxy, method method, object[] args) throws throwable {
// 数据包封装,包含类名、方法名及参数等信息。
request request = new request();
request.setclassname(method.getdeclaringclass().getname());
request.setmethodname(method.getname());
request.setparameters(args);
request.setparametertypes(method.getparametertypes());
request.setid(uuid.randomuuid().tostring());
// 数据发送
object result = sender.send(request);
class<?> returntype = method.getreturntype();
// 处理返回数据
response response = json.parseobject(result.tostring(), response.class);
if (response.getcode() == 1) {
throw new exception(response.geterror());
}
if (returntype.isprimitive() || string.class.isassignablefrom(returntype)) {
return response.getdata();
} else if (collection.class.isassignablefrom(returntype)) {
return jsonarray.parsearray(response.getdata().tostring(), object.class);
} else if (map.class.isassignablefrom(returntype)) {
return json.parseobject(response.getdata().tostring(), map.class);
} else {
object data = response.getdata();
return jsonobject.parseobject(data.tostring(), returntype);
}
}
}
接着我们来看看requestsender怎么处理数据的。
public interface requestsender {
channel connect(socketaddress address) throws interruptedexception;
object send(request request) throws interruptedexception;
}
requestsender本身只是一个接口。他的实现类有:
public class nettyclientapplicationcontextaware extends channelinitializer<socketchannel>
implements requestsender, applicationcontextaware, initializingbean {
private static final logger logger = logger.getlogger(nettyclientapplicationcontextaware.class.getname());
private string remoteaddress;
private bootstrap bootstrap;
private eventloopgroup group;
private nettychannelmanager manager;
private nettyclienthandler handler;
@override
public void setapplicationcontext(applicationcontext applicationcontext) throws beansexception {
this.remoteaddress = applicationcontext.getenvironment().getproperty("remoteaddress");
this.bootstrap = new bootstrap();
this.group = new nioeventloopgroup(1);
this.bootstrap.group(group).
channel(niosocketchannel.class).
option(channeloption.tcp_nodelay, true).
option(channeloption.so_keepalive, true).
handler(this);
this.manager = new nettychannelmanager(this);
this.handler = new nettyclienthandler(manager, remoteaddress);
}
@override
public void afterpropertiesset() throws exception {
// socket连接入口。
this.manager.refresh(lists.newarraylist(remoteaddress));
}
@override
public object send(request request) throws interruptedexception {
channel channel = manager.take();
if (channel != null && channel.isactive()) {
synchronousqueue<object> queue = this.handler.sendrequest(request, channel);
object result = queue.take();
return jsonarray.tojsonstring(result);
} else {
response res = new response();
res.setcode(1);
res.seterror("未正确连接到服务器.请检查相关配置信息!");
return jsonarray.tojsonstring(res);
}
}
@override
protected void initchannel(socketchannel channel) throws exception {
channelpipeline pipeline = channel.pipeline();
pipeline.addlast(new idlestatehandler(0, 0, 30));
pipeline.addlast(new jsonencoder());
pipeline.addlast(new jsondecoder());
// 管道处理器
pipeline.addlast(this.handler);
}
@override
public channel connect(socketaddress address) throws interruptedexception {
channelfuture future = bootstrap.connect(address);
// 建立长连接,提供失败重连。
future.addlistener(new connectionlistener(this.manager, this.remoteaddress));
channel channel = future.channel();//future.sync().channel();
return channel;
}
public void destroy() {
this.group.shutdowngracefully();
}
}
nettyclienthandler类处理管道事件。与服务端不通,这个管道处理器是继承channelinboundhandleradapter类。
@channelhandler.sharable
public class nettyclienthandler extends channelinboundhandleradapter {
private static final logger logger = logger.getlogger(nettyserverhandler.class.getname());
private concurrenthashmap<string, synchronousqueue<object>> queuemap = new concurrenthashmap<>();
private nettychannelmanager manager;
private string remoteaddress;
public nettyclienthandler(nettychannelmanager manager, string remoteaddress) {
this.manager = manager;
this.remoteaddress = remoteaddress;
}
@override
public void channelinactive(channelhandlercontext ctx) {
inetsocketaddress address = (inetsocketaddress) ctx.channel().remoteaddress();
logger.info("与netty服务器断开连接." + address);
ctx.channel().close();
manager.remove(ctx.channel());
// 掉线重连
final eventloop eventloop = ctx.channel().eventloop();
eventloop.schedule(() -> {
manager.refresh(lists.newarraylist(remoteaddress));
}, 1l, timeunit.seconds);
}
@override
public void channelread(channelhandlercontext ctx, object msg) throws exception {
// 处理服务端返回的数据
response response = json.parseobject(msg.tostring(), response.class);
string requestid = response.getrequestid();
synchronousqueue<object> queue = queuemap.get(requestid);
queue.put(response);
queuemap.remove(requestid);
}
public synchronousqueue<object> sendrequest(request request, channel channel) {
// 使用阻塞队列处理客户端请求
synchronousqueue<object> queue = new synchronousqueue<>();
queuemap.put(request.getid(), queue);
channel.writeandflush(request);
return queue;
}
public void usereventtriggered(channelhandlercontext ctx, object evt) throws exception {
logger.info("发送心跳消息...");
if (evt instanceof idlestateevent) {
idlestateevent event = (idlestateevent) evt;
if (event.state() == idlestate.all_idle) {
request request = new request();
request.setmethodname("heartbeat");
ctx.channel().writeandflush(request);
}
} else {
super.usereventtriggered(ctx, evt);
}
}
}
这样,rpc的客户端就写好了,其中主要涉及到的关键内容就是netty实例及管道处理器、jdk动态代理、还有一个阻塞队列。
结合上篇rpc服务端。一个完整的rpc框架就搭建完了。
当然,有些地方处理的还是比较粗糙。后续有修改以git代码为准。