欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

基于netty实现rpc框架-spring boot服务端

程序员文章站 2022-04-04 10:56:45
demo地址 https://gitee.com/syher/grave-netty RPC介绍 首先了解一下RPC:远程过程调用。简单点说就是本地应用可以调用远程服务器的接口。那么通过什么方式调用远程接口呢?说白了RPC只是一种概念。他的调用可以基于HTTP实现,也可以基于TCP/IP实现。甚至私 ......

demo地址

 

rpc介绍

首先了解一下rpc:远程过程调用。简单点说就是本地应用可以调用远程服务器的接口。那么通过什么方式调用远程接口呢?说白了rpc只是一种概念。他的调用可以基于http实现,也可以基于tcp/ip实现。甚至私人定制的通讯协议。

当然,私人定制通讯协议成本过高且不具备通用性。我们不做展开讨论(其实我也展不开。。。)。那为什么不使用http协议呢?受限于http协议层级过高,数据传输效率不如tcp/ip。所以rpc远程调用一般采用tcp/ip实现。即调用socket方法。

 

rpc实现原理

1. 客户端发起远程服务调用。

2. 客户端将类信息、调用方法和入参信息通过socket通道发送给服务端。

3. 服务端解析数据包,调用本地接口。

5.将执行结果通过socket返回给客户端。

6.客户端拿到并解析返回结果。

 

rpc实现

java如何实现一个rpc框架,其实就是按照上面的原理再做一些详细的补充。比如通过动态代理封装客户端的数据包、通过反射机制实现服务端实现类的调用等等。

今天,我们先基于spring boot + netty 做rpc服务端的实现。

 

首先,做一个注解用于标识接口提供rpc调用。

@target(elementtype.type)
@retention(retentionpolicy.runtime)
public @interface service {
    string name() default "";
}

  

该注解用于提供服务的实现类上。

public interface inettyservice {

    string getstring();
}

  

其实现类:

package com.braska.grave.netty.server.service;

@service // 该注解为自定义rpc服务注解
public class nettyservice implements inettyservice {
    @override
    public string getstring() {
        return "welcome to use netty rpc.";
    }
}

  

接着,定义一个注解用来扫描指定包名下的service注解。

@retention(retentionpolicy.runtime)
@target({elementtype.type})
@documented
@import({nettyserverscannerregistrar.class, nettyserverapplicationcontextaware.class})
public @interface nettyserverscan {

    string[] basepackages();
}

  

该注解用于spring boot启动类上,参数basepackages指定服务所在的包路径。

@springbootapplication
@nettyserverscan(basepackages = {
        "com.braska.grave.netty.server.service"
})
public class gravenettyserverapplication {

    public static void main(string[] args) {
        springapplication.run(gravenettyserverapplication.class, args);
    }

}

  

nettyserverscannerregistrar类处理服务的spring bean注册。

public class nettyserverscannerregistrar implements beanfactoryaware, importbeandefinitionregistrar, resourceloaderaware {
    

    @override
    public void registerbeandefinitions(annotationmetadata importingclassmetadata, beandefinitionregistry registry) {
     
    // 创建扫描器实例 nettyserverinterfacescanner scanner = new nettyserverinterfacescanner(registry); if (this.resourceloader != null) { scanner.setresourceloader(this.resourceloader); } annotationattributes annoattrs = annotationattributes.frommap(importingclassmetadata.getannotationattributes(nettyserverscan.class.getname())); list<string> basepackages = new arraylist<string>(); for (string pkg : annoattrs.getstringarray("basepackages")) { if (stringutils.hastext(pkg)) { basepackages.add(pkg); } }      // 只扫描指定的注解。 scanner.setannotationclass(service.class); scanner.registerfilters();

     // 将basepackages里面的通过@service注解的类注册成spring bean。 scanner.doscan(stringutils.tostringarray(basepackages)); } }

  

nettyserverapplicationcontextaware类,暴露socket server端口。

public class nettyserverapplicationcontextaware implements applicationcontextaware, initializingbean {
    private static final logger logger = logger.getlogger(nettyserverapplicationcontextaware.class.getname());

   // 存储接口与实现类的映射,其中key是接口名。value是实现类的bean。 private map<string, object> servicemap = new hashmap<>();

   // 服务worker。包含netty socket服务端生命周期及读写。 serverworker runner; @override public void setapplicationcontext(applicationcontext applicationcontext) throws beansexception { string address = applicationcontext.getenvironment().getproperty("remoteaddress"); map<string, object> beans = applicationcontext.getbeanswithannotation(service.class); for (object servicebean : beans.values()) { class<?> clazz = servicebean.getclass(); class<?>[] interfaces = clazz.getinterfaces(); for (class<?> inter : interfaces) { string interfacename = inter.getname(); servicemap.put(interfacename, servicebean); } }

     // 创建netty worker对象 runner = new serverworker(address, servicemap); } @override public void afterpropertiesset() throws exception {

     // 创建netty socketserver及通道处理器 runner.open(); } }

  

serverworker类的open方法。

public class serverworker extends channelinitializer {

   // socket ip:port private string remoteaddress;

// 实现类的beanmap private map<string, object> servicemap;
// netty channel处理器 nettyserverhandler handler;public void open() { try { int parallel = runtime.getruntime().availableprocessors() * 2; serverbootstrap bootstrap = new serverbootstrap(); this.bossgroup = new nioeventloopgroup(); // todo 使用线程池,提高并发能力 this.workergroup = new nioeventloopgroup(parallel); bootstrap.group(bossgroup, workergroup) .channel(nioserversocketchannel.class) .option(channeloption.so_backlog, 1024) .childoption(channeloption.so_keepalive, true) .childoption(channeloption.tcp_nodelay, true) .childhandler(this); string[] hostandport = this.remoteaddress.split(":"); if (hostandport == null || hostandport.length != 2) { throw new runtimeexception("remoteaddress is error."); } channelfuture cf = bootstrap.bind(hostandport[0], integer.parseint(hostandport[1])).sync(); // todo 信息写入注册中心 // registry.register(serveraddress); logger.info("netty 服务器启动.监听端口:" + hostandport[1]); // 等待服务端监听端口关闭 cf.channel().closefuture().sync(); } catch (exception e) { logger.log(level.severe, "netty server open failed.", e); this.bossgroup.shutdowngracefully(); this.workergroup.shutdowngracefully(); } } @override protected void initchannel(channel channel) throws exception { channelpipeline pipeline = channel.pipeline(); pipeline.addlast(new idlestatehandler(0, 0, 60)); pipeline.addlast(new jsonencoder()); pipeline.addlast(new jsondecoder()); pipeline.addlast(this.handler); } }

 

nettyserverhandler服务端channel处理器,继承channelinboundhandleradapter。

@channelhandler.sharable
public class nettyserverhandler extends channelinboundhandleradapter {
    private map<string, object> servicemap;

    public nettyserverhandler(map<string, object> servicemap) {
        this.servicemap = servicemap;
    }

    @override
    public void channelread(channelhandlercontext ctx, object msg) {

     // 解析客户端发送过来的数据。包含类名、方法名、入参等信息。 request request = json.parseobject(msg.tostring(), request.class); response response = new response(); response.setrequestid(request.getid()); try {

       // 调用本地实现类 object res = this.handler(request); response.setdata(res); } catch (exception e) { response.setcode(-1); response.seterror(e.getmessage()); logger.log(level.severe, "请求调用失败", e); }

     // 返回处理结果给客户端 ctx.writeandflush(response); } private object handler(request request) throws exception { string classname = request.getclassname();

     // 通过classname从beanmap映射中找到托管给spring的bean实现类。 object servicebean = servicemap.get(classname); string methodname = request.getmethodname(); object[] parameters = request.getparameters();

     // 通过反射机制调用实现类。并返回调用结果。 return methodutils.invokemethod(servicebean, methodname, parameters); } }

  

至此,rpc服务端的实现就完成了。

一路看下来,服务端的代码实现还是比较简单的。核心代码只有两个类:serverworker和nettyserverhandler。其余的都是对spring bean注册的支持。