基于Netty自定义RPC
一、基于Netty自定义RPC
RPC又称远程过程调用,我们所知的远程调用分为两种,现在在服务间通信的方式也基本以这两种为主:
- 是基于HTTP的Restful形式的广义远程调用,以Spring Cloud的Feign和RestTemplate为代表,采用的协议是HTTP的7层调用协议,并且协议的参数和响应序列化基本以JSON格式和XML格式为主。
- 是基于TCP的狭义的RPC远程调用,以阿里的Dubbo为代表,主要通过Netty来实现4层网络协议,NIO来异步传输,序列化也可以是JSON或者hessian2以及java自带的序列化等,可以配置。
接下来我们主要以第二种的RPC远程调用来自己实现
需求:
模仿 dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信使用 Netty
步骤:
- 创建一个公共的接口项目以及创建接口及方法,用于消费者和提供者之间的约定。
- 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
- 创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用 Netty 请求提供者返回数据
1.1 公共模块
首先,在公共模块中添加netty的maven依赖
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.16.Final</version>
</dependency>
提供者及消费者工程都需依赖公共模块,这样提供者来实现接口并且提供网络调用,消费者直接通过接口来进行TCP通信及一定的协议定制获取提供者的实现返回值。
接口的定义
public interface IUserService {
String sayHello(String msg);
}
只是一个普通的接口,参数是支持序列化的String类型,返回值同理
1.2 提供者的实现
1、首先是接口的实现,这一点和普通接口实现是一样的
package com.cyd.service;
import com.cyd.handler.UserServiceHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
public class UserServiceImpl implements IUserService {
public String sayHello(String msg) {
System.out.println("调用成功--参数:" + msg);
return "调用成功--参数:" + msg;
}
/**
* 启动服务器
*
* @param ip 服务器IP
* @param port 服务器端口
* @throws InterruptedException
*/
public static void startServer(String ip, int port) throws InterruptedException {
// 1、创建 NioEventLoopGroup的两个实例:bossGroup workerGroup
// bossGroup接收客户端传过来的请求
// workerGroup处理请求
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
// 2、创建服务启动引导类:组装一些必要的组件
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 3、配置启动引导类
// 设置组,第一个bossGroup负责连接,workerGroup负责连接之后的io处理
serverBootstrap.group(bossGroup, workGroup)
// channel方法指定服务器监听的通道类型
.channel(NioServerSocketChannel.class)
// 设置channel handler,每一个客户端连接后,给定一个监听器进行处理
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 获取管道对象
ChannelPipeline pipeline = nioSocketChannel.pipeline();
// 给管道对象pipeline设置编码
pipeline.addLast(new StringEncoder());
pipeline.addLast(new StringDecoder());
// 把自定义的ChannelHandler添加到通道中
pipeline.addLast(new UserServiceHandler());
}
});
// 4、绑定IP地址以及端口
serverBootstrap.bind(ip, port).sync();
}
}
在实现中加入了netty的服务器启动程序,上面的代码中添加了 String类型的编解码 handler,添加了一个自定义 handler
2、自定义 handler 逻辑如下:
package com.cyd.handler;
import com.cyd.service.UserServiceImpl;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* 自定义业务处理器
*/
public class UserServiceHandler extends ChannelInboundHandlerAdapter {
/**
* 当客户端读取数据时,该方法会被调用
*
* @param ctx
* @param msg 客户端发送请求的时候传递一个参数
* @throws Exception
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 注意:客户端将来发送请求的时候传递一个参数:UserService#sayHello#are you ok
// 1、判断请求是否符合规则
if (msg.toString().startsWith("UserService")) {
// 2、如果符合规则,则调用方法并获取result
UserServiceImpl userService = new UserServiceImpl();
String result = userService.sayHello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
// 3、把结果result返回到客户端
ctx.writeAndFlush(result);
}
}
}
这里显示判断了是否符合约定(并没有使用复杂的协议,只是一个字符串判断),然后创建一个具体实现类,并调用方法写回客户端。
3、还需要一个启动类:
public class ServerBoot {
public static void main(String[] args) {
try {
UserServiceImpl.startServer("127.0.0.1", 9999);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
关于提供者的代码就写完了,主要就是创建一个 netty 服务端,实现一个自定义的 handler,自定义 handler 判断是否符合之间的约定(协议),如果符合,就创建一个接口的实现类,并调用他的方法返回字符串。
1.3 消费者相关实现
消费者有一个需要注意的地方,就是调用需要透明,也就是说,框架使用者不用关心底层的网络实现。这里我们可以使用 JDK 的动态代理来实现这个目的。
思路:客户端调用代理方法,返回一个实现了 IUserService 接口的代理对象,调用代理对象的方法,返回结果。
我们需要在代理中做手脚,当调用代理方法的时候,我们需要初始化 Netty 客户端,还需要向服务端请求数据,并返回数据。
首先创建代理相关的类:
package com.cyd.client;
import com.cyd.handler.UserClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
public class RpcConsumer {
// 1、创建一个线程池
public static ExecutorService executorService =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
// 2、声明一个自定义事件处理器 UserClientHandler
private static UserClientHandler userClientHandler;
// 3、初始化客户端(创建连接池、创建启动引导类、配置引导类、连接服务器)
public static void initClient() throws InterruptedException {
// 1) 初始化UserClientHandler
userClientHandler = new UserClientHandler();
// 2) 创建连接池对象
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
// 3) 创建启动引导类
Bootstrap bootstrap = new Bootstrap();
// 4) 配置启动引导类
bootstrap.group(bossGroup)
// 设置通道类型为NIO
.channel(NioSocketChannel.class)
// 设置请求协议为TCP
.option(ChannelOption.TCP_NODELAY, true)
// 监听channel,并初始化
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 获取管道Pipeline
ChannelPipeline pipeline = socketChannel.pipeline();
// 设置编码
pipeline.addLast(new StringEncoder());
pipeline.addLast(new StringDecoder());
// 设置自定义事件处理器
pipeline.addLast(userClientHandler);
}
});
// 5) 连接服务器
bootstrap.connect("127.0.0.1", 9999).sync();
}
/**
* 4、编写一个方法,使用JDK动态代理创建对象
*
* @param serviceClass 接口类型,根据哪个接口生成子类代理对象
* @param providerParam "UserService#sayHello#"
* @return
*/
public static Object createProxy(Class<?> serviceClass, final String providerParam) {
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class[]{serviceClass}, new InvocationHandler() {
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 1) 初始化客户端
if (userClientHandler == null) {
initClient();
}
// 2) 给UserClientHandler设置参数
userClientHandler.setParams(providerParam + args[0]);
// 3) 使用线程池执行UserClientHandler任务,并返回结果
Future future = executorService.submit(userClientHandler);
Object result = future.get();
// 4) 返回结果
return result;
}
});
}
}
该类有 2 个方法,创建代理和初始化客户端。
创建代理逻辑:使用 JDK 的动态代理技术,代理对象中的 invoke 方法实现如下: 如果 client 没有初始化,则初始化 client,这个 client 既是 handler ,也是一个 Callable 。将参数设置进 client ,使用线程池调用 client 的 call 方法并阻塞等待数据返回。
初始化客户端逻辑: 创建一个 Netty 的客户端,并连接提供者,并设置一个自定义 handler,和一些 String 类型的序列化方式。
UserClientHandler 的实现:
package com.cyd.handler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.concurrent.Callable;
/**
* 自定义事件处理器
*/
public class UserClientHandler extends ChannelInboundHandlerAdapter implements Callable {
// 1、定义成员变量
// 事件处理器上下文对象(存储handler信息,进行写操作)
private ChannelHandlerContext channelHandlerContext;
// 记录服务器返回的数据
private String result;
// 记录将要发送给服务器的数据
private String params;
// 2、实现channelActive方法,客户端和服务器建立连接时该方法自动执行
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
// 初始化ChannelHandlerContext
channelHandlerContext = ctx;
}
// 3、实现channelRead方法,当我们读取到服务器数据,该方法自动执行
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
result = msg.toString();
notify();
}
// 4、将客户端的数据写到服务器
public synchronized Object call() throws Exception {
// channelHandlerContext给服务器写数据
channelHandlerContext.writeAndFlush(params);
wait();
return result;
}
// 5、设置客户端请求参数
public void setParams(String params) {
this.params = params;
}
}
该类缓存了 ChannelHandlerContext,用于下次使用,有两个属性:返回结果和请求参数。
当成功连接后,缓存 ChannelHandlerContext,当调用 call 方法的时候,将请求参数发送到服务端,等待。当服务端收到并返回数据后,调用 channelRead 方法,将返回值赋值个 result,并唤醒等待在 call 方法上的线程。此时,代理对象返回数据。
再看看消费者调用方式,一般的TCP的RPC只需要这样调用即可,无需关心具体的协议和通信方式:
package com.cyd.boot;
import com.cyd.client.RpcConsumer;
import com.cyd.service.IUserService;
public class ConsumerBoot {
private static final String PROVIDER_PARAM = "UserService#sayHello#";
public static void main(String[] args) throws InterruptedException {
// 1、创建代理对象
IUserService userService = (IUserService) RpcConsumer.createProxy(IUserService.class, PROVIDER_PARAM);
// 2、循环给服务器写数据
while (true) {
String result = userService.sayHello("are you ok!");
System.out.println("服务器应答------->>>" + result);
Thread.sleep(2000);
}
}
}
调用者首先创建了一个代理对象,然后每隔一秒钟调用代理的 sayHello 方法,并打印服务端返回的结果
可以看到,消费者无需通过jar包的形式引入具体的实现项目,而是通过远程TCP通信的形式,以一定的协议和代理通过接口直接调用了方法,实现远程service间的调用,是分布式服务的基础。
本文地址:https://blog.csdn.net/cyd_0619/article/details/110082941