RPC调用的简单实现
程序员文章站
2022-07-03 19:57:10
RPC调用流程流程描述:1.服务调用者发送请求(interface#method#args)2.客户端进行StringEncode编码3.数据写到服务提供者4.服务提供者接受请求5.将接收的包进行StringDecode解码6.服务提供方调用对应api7.服务提供方响应方法调用结果8.服务提供方将结果集进行StringEncode编码9.服务提供方发送结果集包到服务调用者10.服务调用者接受数据包11.服务调用者将数据包进行StringDecode解码...
RPC调用流程
流程描述:
1.服务调用者发送请求(interface#method#args)
2.客户端进行StringEncode编码
3.数据写到服务提供者
4.服务提供者接受请求
5.将接收的包进行StringDecode解码
6.服务提供方调用对应api
7.服务提供方响应方法调用结果
8.服务提供方将结果集进行StringEncode编码
9.服务提供方发送结果集包到服务调用者
10.服务调用者接受数据包
11.服务调用者将数据包进行StringDecode解码
12.服务调用者输出方法调用结果集
代码流程:
服务端接口:
package com.hx.zbhuang.netty.dubboCall;
/**
* 远程服务接口
*/
public interface HelloService {
String hello(String mes);
String say(String msg);
}
服务端接口实现类:
package com.hx.zbhuang.netty.dubboCall;
/**
* 远程服务实现类
*/
public class HelloServiceImpl implements HelloService {
@Override
public String hello(String msg) {
if(msg!=null) {
return "hello 豆腐蛋,i accept you msg:["+msg+"]";
} else {
return "hello 豆腐蛋,i accept you msg";
}
}
@Override
public String say(String msg) {
return "hello 豆腐蛋" + "==msg:"+msg;
}
}
服务端初始化:
package com.hx.zbhuang.netty.dubboCall;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
/**
* 服务端初始化
*/
public class NettyServer {
public static void main(String[] args) {
EventLoopGroup bossGroup = null;
EventLoopGroup workerGroup = null;
try {
workerGroup = new NioEventLoopGroup();
bossGroup = new NioEventLoopGroup(1);
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
// 服务端拦水坝处理
pipeline.addLast(new NettyServerHandler());
}
});
ChannelFuture channelFuture = serverBootstrap.bind(7766).sync();
channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
服务端数据处理handler
package com.hx.zbhuang.netty.dubboCall;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.lang.reflect.Method;
public class NettyServerHandler extends SimpleChannelInboundHandler {
/**
* 读取客户端需要调用的方法进行处理返回结果集
* @param ctx
* @param msg
* @throws Exception
*/
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("msg="+msg);
String methodName = msg.toString().split("#")[1];
Method[] methods = HelloService.class.getMethods();
for (Method method:methods) {
if(method.getName().equals(methodName)) {
Object obj = method.invoke(new HelloServiceImpl(),msg.toString().substring(msg.toString().lastIndexOf("#")+1));
ctx.writeAndFlush(obj.toString());
}
}
}
/**
* 异常处理
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
}
客户端接口调用初始化
package com.hx.zbhuang.netty.dubboCall;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class NettyClient {
//线程池管理客户端请求
private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
private static NettyClientHandler client;
/**
* 返回服务端接口方法调用结果集
* @param serviceClass
* @param interfaceName
* @return
*/
Object getBean(final Class<?> serviceClass, final String interfaceName){
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),new Class<?>[]{serviceClass},(proxy,method,args) -> {
if (client ==null) {
initClient();
}
client.setPara(interfaceName+"#"+method.getName()+"#"+args[0]);
Object obj = executor.submit(client).get();
return obj;
});
}
/**
* 客户端初始化
*/
private static void initClient() {
client = new NettyClientHandler();
NioEventLoopGroup group =null;
try {
group = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY,true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline=socketChannel.pipeline();
pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
// 客户端拦水坝处理
pipeline.addLast(client);
}
});
bootstrap.connect("127.0.0.1",7766).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
客户端数据处理handler
package com.hx.zbhuang.netty.dubboCall;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.concurrent.Callable;
public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
// channelHandler上下文
private ChannelHandlerContext context;
// 远程服务获取方法调用结果
private String result;
// 接口名#方法名#参数
private String para;
/**
* 将请求写到服务端
* @return
* @throws Exception
*/
@Override
public synchronized Object call() throws Exception {
context.writeAndFlush(para);
wait();
return result;
}
/**
* 获取上下文信息
* @param ctx
* @throws Exception
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
context = ctx;
}
/**
* 获取服务端调用结果
* @param ctx
* @param msg
* @throws Exception
*/
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
result = msg.toString();
notify();
}
/**
* 异常处理
* @param ctx
* @param cause
* @throws Exception
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
ctx.close();
}
void setPara(String para) {
this.para = para;
}
}
模拟接口调用:
package com.hx.zbhuang.netty.dubboCall;
public class ClientBootstrap {
// 暴露的接口名
public static final String interfaceName = "HelloService";
public static void main(String[] args) throws InterruptedException {
NettyClient customer = new NettyClient();
// 调用远程服务接口
HelloService service = (HelloService)customer.getBean(HelloService.class, interfaceName);
String msg = service.say("来打王者");
System.out.println("调用结果"+ msg);
}
}
客户端远程调用服务端接口响应:
本文地址:https://blog.csdn.net/qq_33554285/article/details/110457528
下一篇: python路径及运行方式相关说明