欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Netty实现远程调用RPC功能

程序员文章站 2022-03-07 16:01:55
添加依赖 io.netty netty-all 4.1.2.Final or ......

添加依赖

<dependency>
    <groupid>io.netty</groupid>
    <artifactid>netty-all</artifactid>
    <version>4.1.2.final</version>
</dependency>

<dependency>
    <groupid>org.reflections</groupid>
    <artifactid>reflections</artifactid>
    <version>0.9.10</version>
</dependency>

组织架构

Netty实现远程调用RPC功能

服务端

封装类信息

public class classinfo implements serializable {

    private static final long serialversionuid = 1l;

    private string classname;  //类名
    private string methodname;//方法名
    private class<?>[] types; //参数类型
    private object[] objects;//参数列表

    public string getclassname() {
        return classname;
    }

    public void setclassname(string classname) {
        this.classname = classname;
    }

    public string getmethodname() {
        return methodname;
    }

    public void setmethodname(string methodname) {
        this.methodname = methodname;
    }

    public class<?>[] gettypes() {
        return types;
    }

    public void settypes(class<?>[] types) {
        this.types = types;
    }

    public object[] getobjects() {
        return objects;
    }

    public void setobjects(object[] objects) {
        this.objects = objects;
    }
}

服务端网络处理服务器

public class nettyrpcserver {
    private int port;
    public nettyrpcserver(int port) {
        this.port = port;
    }

    public void start() {
        eventloopgroup bossgroup = new nioeventloopgroup();
        eventloopgroup workergroup = new nioeventloopgroup();
        try {
            serverbootstrap serverbootstrap = new serverbootstrap();
            serverbootstrap.group(bossgroup, workergroup)
                    .channel(nioserversocketchannel.class)
                    .option(channeloption.so_backlog, 128)
                    .childoption(channeloption.so_keepalive, true)
                    .localaddress(port).childhandler(
                            new channelinitializer<socketchannel>() {
                                @override
                                protected void initchannel(socketchannel ch) throws exception {
                                    channelpipeline pipeline = ch.pipeline();
                                    //编码器
                                    pipeline.addlast("encoder", new objectencoder());
                                    //解码器
                                    pipeline.addlast("decoder", new objectdecoder(integer.max_value, classresolvers.cachedisabled(null)));
                                    //服务器端业务处理类
                                    pipeline.addlast(new invokehandler());
                                }
                            });
            channelfuture future = serverbootstrap.bind(port).sync();
            system.out.println("......server is ready......");
            future.channel().closefuture().sync();
        } catch (exception e) {
            bossgroup.shutdowngracefully();
            workergroup.shutdowngracefully();
        }
    }

    public static void main(string[] args) throws exception {
        new nettyrpcserver(9999).start();
    }
}

服务器端业务处理类

public class invokehandler extends channelinboundhandleradapter {
    //得到某接口下某个实现类的名字
    private string getimplclassname(classinfo classinfo) throws exception{
        //服务方接口和实现类所在的包路径
        string interfacepath="com.lyz.server";
        int lastdot = classinfo.getclassname().lastindexof(".");
        string interfacename=classinfo.getclassname().substring(lastdot);
        class superclass=class.forname(interfacepath+interfacename);
        reflections reflections = new reflections(interfacepath);
        //得到某接口下的所有实现类
        set<class> implclassset=reflections.getsubtypesof(superclass);
        if(implclassset.size()==0){
            system.out.println("未找到实现类");
            return null;
        }else if(implclassset.size()>1){
            system.out.println("找到多个实现类,未明确使用哪一个");
            return null;
        }else {
            //把集合转换为数组
            class[] classes=implclassset.toarray(new class[0]);
            return classes[0].getname(); //得到实现类的名字
        }
    }

    @override  //读取客户端发来的数据并通过反射调用实现类的方法
    public void channelread(channelhandlercontext ctx, object msg) throws exception {
        classinfo classinfo = (classinfo) msg;
        system.out.println(classinfo);
        object clazz = class.forname(getimplclassname(classinfo)).newinstance();
        method method = clazz.getclass().getmethod(classinfo.getmethodname(), classinfo.gettypes());
        //通过反射调用实现类的方法
        object result = method.invoke(clazz, classinfo.getobjects());
        ctx.writeandflush(result);
    }
}

服务端接口及实现类

// 无参接口
public interface hellonetty {
    string hello();
}

// 实现类
public class hellonettyimpl implements hellonetty {
    @override
    public string hello() {
        return "hello,netty";
    }
}

// 带参接口
public interface hellorpc {
    string hello(string name);
}

// 实现类
public class hellorpcimpl implements hellorpc {
    @override
    public string hello(string name) {
        return "hello," + name;
    }
}

客户端

代理类

public class nettyrpcproxy {
    //根据接口创建代理对象
    public static object create(class target) {
        return proxy.newproxyinstance(target.getclassloader(), new class[]{target}, new invocationhandler() {
            @override
            public object invoke(object proxy, method method, object[] args)
                    throws throwable {
                //封装classinfo
                classinfo classinfo = new classinfo();
                classinfo.setclassname(target.getname());
                classinfo.setmethodname(method.getname());
                classinfo.setobjects(args);
                classinfo.settypes(method.getparametertypes());

                //开始用netty发送数据
                eventloopgroup group = new nioeventloopgroup();
                resulthandler resulthandler = new resulthandler();
                try {
                    bootstrap b = new bootstrap();
                    b.group(group)
                            .channel(niosocketchannel.class)
                            .handler(new channelinitializer<socketchannel>() {
                                @override
                                public void initchannel(socketchannel ch) throws exception {
                                    channelpipeline pipeline = ch.pipeline();
                                    //编码器
                                    pipeline.addlast("encoder", new objectencoder());
                                    //解码器  构造方法第一个参数设置二进制数据的最大字节数  第二个参数设置具体使用哪个类解析器
                                    pipeline.addlast("decoder", new objectdecoder(integer.max_value, classresolvers.cachedisabled(null)));
                                    //客户端业务处理类
                                    pipeline.addlast("handler", resulthandler);
                                }
                            });
                    channelfuture future = b.connect("127.0.0.1", 9999).sync();
                    future.channel().writeandflush(classinfo).sync();
                    future.channel().closefuture().sync();
                } finally {
                    group.shutdowngracefully();
                }
                return resulthandler.getresponse();
            }
        });
    }
}

客户端业务处理类

public class resulthandler extends channelinboundhandleradapter {

    private object response;
    public object getresponse() {
        return response;
    }

    @override //读取服务器端返回的数据(远程调用的结果)
    public void channelread(channelhandlercontext ctx, object msg) throws exception {
        response = msg;
        ctx.close();
    }
}

客户端接口

// 无参接口
public interface hellonetty {
    string hello();
}

// 带参接口
public interface hellorpc {
    string hello(string name);
}

测试类 服务调用方

public class testnettyrpc {
    public static void main(string [] args){

        //第1次远程调用
        hellonetty hellonetty=(hellonetty) nettyrpcproxy.create(hellonetty.class);
        system.out.println(hellonetty.hello());

        //第2次远程调用
        hellorpc hellorpc =  (hellorpc) nettyrpcproxy.create(hellorpc.class);
        system.out.println(hellorpc.hello("rpc"));

    }
}

输出结果

服务端

......server is ready......
com.lyz.serverstub.classinfo@2b894733
com.lyz.serverstub.classinfo@167bfa9

客户端

hello,netty
hello,rpc

下一篇通过netty实现线上聊天功能