Netty实现远程调用RPC功能
程序员文章站
2022-03-07 16:01:55
添加依赖 io.netty netty-all 4.1.2.Final or ......
添加依赖
<dependency>
<groupid>io.netty</groupid>
<artifactid>netty-all</artifactid>
<version>4.1.2.final</version>
</dependency>
<dependency>
<groupid>org.reflections</groupid>
<artifactid>reflections</artifactid>
<version>0.9.10</version>
</dependency>
组织架构
服务端
封装类信息
public class classinfo implements serializable {
private static final long serialversionuid = 1l;
private string classname; //类名
private string methodname;//方法名
private class<?>[] types; //参数类型
private object[] objects;//参数列表
public string getclassname() {
return classname;
}
public void setclassname(string classname) {
this.classname = classname;
}
public string getmethodname() {
return methodname;
}
public void setmethodname(string methodname) {
this.methodname = methodname;
}
public class<?>[] gettypes() {
return types;
}
public void settypes(class<?>[] types) {
this.types = types;
}
public object[] getobjects() {
return objects;
}
public void setobjects(object[] objects) {
this.objects = objects;
}
}
服务端网络处理服务器
public class nettyrpcserver {
private int port;
public nettyrpcserver(int port) {
this.port = port;
}
public void start() {
eventloopgroup bossgroup = new nioeventloopgroup();
eventloopgroup workergroup = new nioeventloopgroup();
try {
serverbootstrap serverbootstrap = new serverbootstrap();
serverbootstrap.group(bossgroup, workergroup)
.channel(nioserversocketchannel.class)
.option(channeloption.so_backlog, 128)
.childoption(channeloption.so_keepalive, true)
.localaddress(port).childhandler(
new channelinitializer<socketchannel>() {
@override
protected void initchannel(socketchannel ch) throws exception {
channelpipeline pipeline = ch.pipeline();
//编码器
pipeline.addlast("encoder", new objectencoder());
//解码器
pipeline.addlast("decoder", new objectdecoder(integer.max_value, classresolvers.cachedisabled(null)));
//服务器端业务处理类
pipeline.addlast(new invokehandler());
}
});
channelfuture future = serverbootstrap.bind(port).sync();
system.out.println("......server is ready......");
future.channel().closefuture().sync();
} catch (exception e) {
bossgroup.shutdowngracefully();
workergroup.shutdowngracefully();
}
}
public static void main(string[] args) throws exception {
new nettyrpcserver(9999).start();
}
}
服务器端业务处理类
public class invokehandler extends channelinboundhandleradapter {
//得到某接口下某个实现类的名字
private string getimplclassname(classinfo classinfo) throws exception{
//服务方接口和实现类所在的包路径
string interfacepath="com.lyz.server";
int lastdot = classinfo.getclassname().lastindexof(".");
string interfacename=classinfo.getclassname().substring(lastdot);
class superclass=class.forname(interfacepath+interfacename);
reflections reflections = new reflections(interfacepath);
//得到某接口下的所有实现类
set<class> implclassset=reflections.getsubtypesof(superclass);
if(implclassset.size()==0){
system.out.println("未找到实现类");
return null;
}else if(implclassset.size()>1){
system.out.println("找到多个实现类,未明确使用哪一个");
return null;
}else {
//把集合转换为数组
class[] classes=implclassset.toarray(new class[0]);
return classes[0].getname(); //得到实现类的名字
}
}
@override //读取客户端发来的数据并通过反射调用实现类的方法
public void channelread(channelhandlercontext ctx, object msg) throws exception {
classinfo classinfo = (classinfo) msg;
system.out.println(classinfo);
object clazz = class.forname(getimplclassname(classinfo)).newinstance();
method method = clazz.getclass().getmethod(classinfo.getmethodname(), classinfo.gettypes());
//通过反射调用实现类的方法
object result = method.invoke(clazz, classinfo.getobjects());
ctx.writeandflush(result);
}
}
服务端接口及实现类
// 无参接口
public interface hellonetty {
string hello();
}
// 实现类
public class hellonettyimpl implements hellonetty {
@override
public string hello() {
return "hello,netty";
}
}
// 带参接口
public interface hellorpc {
string hello(string name);
}
// 实现类
public class hellorpcimpl implements hellorpc {
@override
public string hello(string name) {
return "hello," + name;
}
}
客户端
代理类
public class nettyrpcproxy {
//根据接口创建代理对象
public static object create(class target) {
return proxy.newproxyinstance(target.getclassloader(), new class[]{target}, new invocationhandler() {
@override
public object invoke(object proxy, method method, object[] args)
throws throwable {
//封装classinfo
classinfo classinfo = new classinfo();
classinfo.setclassname(target.getname());
classinfo.setmethodname(method.getname());
classinfo.setobjects(args);
classinfo.settypes(method.getparametertypes());
//开始用netty发送数据
eventloopgroup group = new nioeventloopgroup();
resulthandler resulthandler = new resulthandler();
try {
bootstrap b = new bootstrap();
b.group(group)
.channel(niosocketchannel.class)
.handler(new channelinitializer<socketchannel>() {
@override
public void initchannel(socketchannel ch) throws exception {
channelpipeline pipeline = ch.pipeline();
//编码器
pipeline.addlast("encoder", new objectencoder());
//解码器 构造方法第一个参数设置二进制数据的最大字节数 第二个参数设置具体使用哪个类解析器
pipeline.addlast("decoder", new objectdecoder(integer.max_value, classresolvers.cachedisabled(null)));
//客户端业务处理类
pipeline.addlast("handler", resulthandler);
}
});
channelfuture future = b.connect("127.0.0.1", 9999).sync();
future.channel().writeandflush(classinfo).sync();
future.channel().closefuture().sync();
} finally {
group.shutdowngracefully();
}
return resulthandler.getresponse();
}
});
}
}
客户端业务处理类
public class resulthandler extends channelinboundhandleradapter {
private object response;
public object getresponse() {
return response;
}
@override //读取服务器端返回的数据(远程调用的结果)
public void channelread(channelhandlercontext ctx, object msg) throws exception {
response = msg;
ctx.close();
}
}
客户端接口
// 无参接口
public interface hellonetty {
string hello();
}
// 带参接口
public interface hellorpc {
string hello(string name);
}
测试类 服务调用方
public class testnettyrpc {
public static void main(string [] args){
//第1次远程调用
hellonetty hellonetty=(hellonetty) nettyrpcproxy.create(hellonetty.class);
system.out.println(hellonetty.hello());
//第2次远程调用
hellorpc hellorpc = (hellorpc) nettyrpcproxy.create(hellorpc.class);
system.out.println(hellorpc.hello("rpc"));
}
}
输出结果
服务端
......server is ready......
com.lyz.serverstub.classinfo@2b894733
com.lyz.serverstub.classinfo@167bfa9
客户端
hello,netty
hello,rpc
下一篇通过netty实现线上聊天功能