欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

基于Netty自定义RPC 客户端模块rpc-consumer

程序员文章站 2022-07-09 19:16:55
消费者有一个需要注意的地方,就是调用需要透明,也就是说,框架使用者不用关心底层的网络实现。这里我们可以使用 JDK 的动态代理来实现这个目的。思路:客户端调用代理方法,返回一个实现了 HelloService 接口的代理对象,调用代理对象的方法,返回结果。我们需要在代理中做手脚,当调用代理方法的时候,我们需要初始化 Netty 客户端,还需要向服务端请求数据,并返回数据。首先创建代理相关的类:RPCConsumerpackage com.lwl.client;import com.lwl.com....

基于Netty自定义RPC 客户端模块rpc-consumer
消费者有一个需要注意的地方,就是调用需要透明,也就是说,框架使用者不用关心底层的网络实现。这里我们可以使用 JDK 的动态代理来实现这个目的。
思路:客户端调用代理方法,返回一个实现了 HelloService 接口的代理对象,调用代理对象的方法,返回结果。我们需要在代理中做手脚,当调用代理方法的时候,我们需要初始化 Netty 客户端,还需要向服务端请求数据,并返回数据。首先创建代理相关的类:

RPCConsumer

package com.lwl.client; import com.lwl.common.JSONSerializer; import com.lwl.common.RpcEncoder; import com.lwl.common.RpcRequest; import com.lwl.handler.UserClientHandler; import io.netty.bootstrap.Bootstrap; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelOption; import io.netty.channel.ChannelPipeline; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.string.StringDecoder; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; /**
 * 消费者
 */ public class RPCConsumer { //1、创建线程池对象   处理自定义事件 //并且当前线程池的线程数以当前系统的cpu核数 private static ExecutorService executorService = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); //2、声明一个自定义事件处理器  UserClientHandler public static UserClientHandler userClientHandler; //初始化客户端  创建连接池   bootStrap  甚至bootStrap  连接服务器 public static void initClient() throws InterruptedException { //初始化 UserClientHandler userClientHandler = new UserClientHandler(); //创建连接池对象 NioEventLoopGroup group = new NioEventLoopGroup(); //创建客户端引导对象 Bootstrap bootstrap = new Bootstrap(); //配置引导对象 bootstrap.group(group) //设置对象为nio .channel(NioSocketChannel.class) //设置请求协议为tcp .option(ChannelOption.TCP_NODELAY,true) //监听handler 并初始化 .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel socketChannel) throws Exception { //获取channelPipeline ChannelPipeline pipeline = socketChannel.pipeline(); //设置编码 pipeline.addLast(new StringDecoder()); pipeline.addLast( new RpcEncoder(RpcRequest.class, new JSONSerializer())); //添加自定义事件处理器 pipeline.addLast(userClientHandler); } }); bootstrap.connect("127.0.0.1",8999).sync(); } //定义一个方法  使用jdk动态代理创建对象 //serverClass 接口类型  根据哪个接口生成子类代理对象 public static Object createProxy(final Class<?> serviceClass){ return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(), new Class[]{serviceClass}, new InvocationHandler() { public Object invoke(Object o, Method method, Object[] objects) throws Throwable { //1、初始化客户端client if (null == userClientHandler){ initClient(); } //2、给userClientHandler 设置参数 RpcRequest rpcRequest = new RpcRequest(); rpcRequest.setClassName(serviceClass.getName()); rpcRequest.setMethodName(method.getName()); rpcRequest.setParameters(objects); rpcRequest.setParameterTypes(method.getParameterTypes()); rpcRequest.setRequestId(UUID.randomUUID().toString()); userClientHandler.setParam(rpcRequest); //3、使用线程池  开启一个线程处理call()  写操作 并返回结果 Object result = executorService.submit(userClientHandler).get(); //返回结果 return result; } } ); } } 

自定义事件处理器 UserClientHandler

package com.lwl.handler; import com.lwl.common.RpcRequest; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.util.concurrent.Callable; /**
 * 自定义事件处理器
 */ public class UserClientHandler extends ChannelInboundHandlerAdapter implements Callable { //定义成员变量 //事件处理器上下文对象  (存储handler信息  写操作) private ChannelHandlerContext context; //记录服务器返回的数据 private String result; //记录发送给服务器的数据 private RpcRequest rpcRequest; //事件channelActive   客户端和服务器连接时,该方法自动执行 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { //初始化 this.context = ctx; } //实现channelRead  当服务器督导数据时 方法自动执行 @Override public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { //将读到的数据msg 设置为成员变量的值 result = msg.toString(); //唤醒 写操作 notify(); } //将客户端的数据写到服务器 public synchronized Object call() throws Exception { context.writeAndFlush(rpcRequest); wait(); return result; } //设置参数的方法 public void setParam(RpcRequest rpcRequest){ this.rpcRequest =rpcRequest; } } 

启动引导类ConsumerBoot

package com.lwl.boot; import com.lwl.client.RPCConsumer; import com.lwl.service.IUserService; public class ConsumerBoot { //参数定义 private static final String PROVIDER_NAME = "UserService#sayHello#"; public static void main(String[] args) throws InterruptedException { //1、创建代理对象 IUserService service = (IUserService) RPCConsumer.createProxy(IUserService.class); //2、循环给服务器写数据 while (true){ String result = service.sayHello("are you ok ----"); System.out.println("-->{}  "+result); Thread.sleep(2000); } } } 

本文地址:https://blog.csdn.net/qq_36194388/article/details/108044368

相关标签: Netty 分布式