欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

基于netty实现rpc框架-spring boot客户端

程序员文章站 2022-11-17 17:06:15
上篇讲了RPC服务端的实现。原理就是解析netty通道数据拿到类、方法及入参等信息,然后通过java反射机制调用本地接口返回结果。没有用到很复杂的技术。 这篇我们将客户端的实现。说白了客户端的任务很简单:一是建立socket长连接。二是封装发送服务端需要的数据包。三是处理返回结果。 demo地址 h ......

上篇讲了rpc服务端的实现。原理就是解析netty通道数据拿到类、方法及入参等信息,然后通过java反射机制调用本地接口返回结果。没有用到很复杂的技术。

这篇我们将客户端的实现。说白了客户端的任务很简单:一是建立socket长连接。二是封装发送服务端需要的数据包。三是处理返回结果。

demo地址

rpc实现

同样定义注解扫描service接口。

@retention(retentionpolicy.runtime)
@target({elementtype.type})
@documented
@import({nettyclientscannerregistrar.class, nettyclientapplicationcontextaware.class})
public @interface nettyclientscan {

    string[] basepackages();

    class<? extends nettyfactorybean> factorybean() default nettyfactorybean.class;
}

  

该注解用于spring boot启动类上,参数basepackages指定接口所在的包路径。

@springbootapplication
@nettyclientscan(basepackages = {
        "com.braska.grave.netty.api.service"
})
public class gravenettyclientapplication {

    public static void main(string[] args) {
        springapplication.run(gravenettyclientapplication.class, args);
    }

}

  

nettyserverscannerregistrar类注册bean。

public class nettyclientscannerregistrar implements importbeandefinitionregistrar, resourceloaderaware {

    @override
    public void registerbeandefinitions(annotationmetadata importingclassmetadata, beandefinitionregistry registry) {
        // spring bean注册
        nettyclientinterfacescanner scanner = new nettyclientinterfacescanner(registry);

        annotationattributes annoattrs =
                annotationattributes.frommap(importingclassmetadata.getannotationattributes(nettyclientscan.class.getname()));

        class<? extends nettyfactorybean> nettyfactorybeanclass = annoattrs.getclass("factorybean");
        if (!nettyfactorybean.class.equals(nettyfactorybeanclass)) {
            scanner.setnettyfactorybean(beanutils.instantiateclass(nettyfactorybeanclass));
        }

        list<string> basepackages = new arraylist<string>();
        for (string pkg : annoattrs.getstringarray("basepackages")) {
            if (stringutils.hastext(pkg)) {
                basepackages.add(pkg);
            }
        }

        scanner.doscan(stringutils.tostringarray(basepackages));
    }
}

  

nettyclientinterfacescanner类使用jdk动态代理basepackages路径下的接口。

public class nettyclientinterfacescanner extends classpathbeandefinitionscanner {
    private nettyfactorybean nettyfactorybean = new nettyfactorybean();


    @override
    public set<beandefinitionholder> doscan(string... basepackages) {
        set<beandefinitionholder> beandefinitions = super.doscan(basepackages);

        if (beandefinitions.isempty()) {
        } else {
            processbeandefinitions(beandefinitions);
        }

        return beandefinitions;
    }

    private void processbeandefinitions(
            set<beandefinitionholder> beandefinitions) {

        genericbeandefinition definition;

        for (beandefinitionholder holder : beandefinitions) {

            definition = (genericbeandefinition) holder.getbeandefinition();
            // 为对象属性赋值(这一块我也还不太明白)
       definition.getconstructorargumentvalues().addgenericargumentvalue(definition.getbeanclassname());
            // 这里的nettyfactorybean是生成bean实例的工厂,不是bean本身
            definition.setbeanclass(this.nettyfactorybean.getclass());

            definition.setautowiremode(abstractbeandefinition.autowire_by_type);
        }
    }
}

  

nettyfactorybean 

public class nettyfactorybean<t> implements factorybean<t> {
    private class<t> nettyinterface;

    public nettyfactorybean() {}

    public nettyfactorybean(class<t> nettyinterface) {
        this.nettyinterface = nettyinterface;
    }

    @override
    public t getobject() throws exception {
        // 通过jdk动态代理创建实例
        return (t) proxy.newproxyinstance(nettyinterface.getclassloader(), new class[]{nettyinterface}, c.getinstance());
    }

    @override
    public class<?> getobjecttype() {
        return this.nettyinterface;
    }

    @override
    public boolean issingleton() {
        return true;
    }
}

  

关键来了,nettyinterfaceinvoker类负责数据包封装及发送。

public class nettyinterfaceinvoker implements invocationhandler {

    private requestsender sender;
    // 静态内部类做单例模式 
    private static class singleton {
        private static final nettyinterfaceinvoker invoker = new nettyinterfaceinvoker();

        private static nettyinterfaceinvoker setsender(requestsender sender) {
            invoker.sender = sender;
            return invoker;
        }
    }

    public static nettyinterfaceinvoker getinstance() {
        return singleton.invoker;
    }

    public static nettyinterfaceinvoker setsender(requestsender sender) {
        return singleton.setsender(sender);
    }

    @override
    public object invoke(object proxy, method method, object[] args) throws throwable {
        // 数据包封装,包含类名、方法名及参数等信息。
        request request = new request();
        request.setclassname(method.getdeclaringclass().getname());
        request.setmethodname(method.getname());
        request.setparameters(args);
        request.setparametertypes(method.getparametertypes());
        request.setid(uuid.randomuuid().tostring());
        // 数据发送
        object result = sender.send(request);
        class<?> returntype = method.getreturntype();
        // 处理返回数据
        response response = json.parseobject(result.tostring(), response.class);
        if (response.getcode() == 1) {
            throw new exception(response.geterror());
        }
        if (returntype.isprimitive() || string.class.isassignablefrom(returntype)) {
            return response.getdata();
        } else if (collection.class.isassignablefrom(returntype)) {
            return jsonarray.parsearray(response.getdata().tostring(), object.class);
        } else if (map.class.isassignablefrom(returntype)) {
            return json.parseobject(response.getdata().tostring(), map.class);
        } else {
            object data = response.getdata();
            return jsonobject.parseobject(data.tostring(), returntype);
        }
    }
}

  

接着我们来看看requestsender怎么处理数据的。

public interface requestsender {
    channel connect(socketaddress address) throws interruptedexception;

    object send(request request) throws interruptedexception;
}

  

requestsender本身只是一个接口。他的实现类有:

public class nettyclientapplicationcontextaware extends channelinitializer<socketchannel>
        implements requestsender, applicationcontextaware, initializingbean {
    private static final logger logger = logger.getlogger(nettyclientapplicationcontextaware.class.getname());

    private string remoteaddress;
    private bootstrap bootstrap;
    private eventloopgroup group;
    private nettychannelmanager manager;
    private nettyclienthandler handler;

    @override
    public void setapplicationcontext(applicationcontext applicationcontext) throws beansexception {
        this.remoteaddress = applicationcontext.getenvironment().getproperty("remoteaddress");
        this.bootstrap = new bootstrap();
        this.group = new nioeventloopgroup(1);
        this.bootstrap.group(group).
                channel(niosocketchannel.class).
                option(channeloption.tcp_nodelay, true).
                option(channeloption.so_keepalive, true).
                handler(this);
        this.manager = new nettychannelmanager(this);
        this.handler = new nettyclienthandler(manager, remoteaddress);
    }

    @override
    public void afterpropertiesset() throws exception {
        // socket连接入口。
        this.manager.refresh(lists.newarraylist(remoteaddress));
    }

    @override
    public object send(request request) throws interruptedexception {
        channel channel = manager.take();
        if (channel != null && channel.isactive()) {
            synchronousqueue<object> queue = this.handler.sendrequest(request, channel);
            object result = queue.take();
            return jsonarray.tojsonstring(result);
        } else {
            response res = new response();
            res.setcode(1);
            res.seterror("未正确连接到服务器.请检查相关配置信息!");
            return jsonarray.tojsonstring(res);
        }
    }

    @override
    protected void initchannel(socketchannel channel) throws exception {
        channelpipeline pipeline = channel.pipeline();
        pipeline.addlast(new idlestatehandler(0, 0, 30));
        pipeline.addlast(new jsonencoder());
        pipeline.addlast(new jsondecoder());
        // 管道处理器
        pipeline.addlast(this.handler);
    }

    @override
    public channel connect(socketaddress address) throws interruptedexception {
        channelfuture future = bootstrap.connect(address);
        // 建立长连接,提供失败重连。
        future.addlistener(new connectionlistener(this.manager, this.remoteaddress));
        channel channel = future.channel();//future.sync().channel();
        return channel;
    }

    public void destroy() {
        this.group.shutdowngracefully();
    }
}

  

nettyclienthandler类处理管道事件。与服务端不通,这个管道处理器是继承channelinboundhandleradapter类。

@channelhandler.sharable
public class nettyclienthandler extends channelinboundhandleradapter {
    private static final logger logger = logger.getlogger(nettyserverhandler.class.getname());

    private concurrenthashmap<string, synchronousqueue<object>> queuemap = new concurrenthashmap<>();
    private nettychannelmanager manager;
    private string remoteaddress;

    public nettyclienthandler(nettychannelmanager manager, string remoteaddress) {
        this.manager = manager;
        this.remoteaddress = remoteaddress;
    }

    @override
    public void channelinactive(channelhandlercontext ctx) {
        inetsocketaddress address = (inetsocketaddress) ctx.channel().remoteaddress();
        logger.info("与netty服务器断开连接." + address);
        ctx.channel().close();
        manager.remove(ctx.channel());
        // 掉线重连
        final eventloop eventloop = ctx.channel().eventloop();
        eventloop.schedule(() -> {
            manager.refresh(lists.newarraylist(remoteaddress));
        }, 1l, timeunit.seconds);
    }

    @override
    public void channelread(channelhandlercontext ctx, object msg) throws exception {
        // 处理服务端返回的数据
        response response = json.parseobject(msg.tostring(), response.class);
        string requestid = response.getrequestid();
        synchronousqueue<object> queue = queuemap.get(requestid);
        queue.put(response);
        queuemap.remove(requestid);
    }

    public synchronousqueue<object> sendrequest(request request, channel channel) {
        // 使用阻塞队列处理客户端请求
        synchronousqueue<object> queue = new synchronousqueue<>();
        queuemap.put(request.getid(), queue);
        channel.writeandflush(request);
        return queue;
    }

    public void usereventtriggered(channelhandlercontext ctx, object evt) throws exception {
        logger.info("发送心跳消息...");
        if (evt instanceof idlestateevent) {
            idlestateevent event = (idlestateevent) evt;
            if (event.state() == idlestate.all_idle) {
                request request = new request();
                request.setmethodname("heartbeat");
                ctx.channel().writeandflush(request);
            }
        } else {
            super.usereventtriggered(ctx, evt);
        }
    }
}

  

这样,rpc的客户端就写好了,其中主要涉及到的关键内容就是netty实例及管道处理器、jdk动态代理、还有一个阻塞队列。

结合上篇rpc服务端。一个完整的rpc框架就搭建完了。

当然,有些地方处理的还是比较粗糙。后续有修改以git代码为准。