荐 我的架构梦:(二十)基于Netty手写RPC框架
一、前言
RPC又称远程过程调用,我们所知的远程调用分为两种,现在在服务间通信的方式也基本以这两种为主:
- 是基于
HTTP
的restful
形式的广义远程调用,以spring could
的feign
和restTemplate
为代表,采用的协议是HTTP
的7
层调用协议,并且协议的参数和响应序列化基本以JSON
格式和XML
格式为主。 - 是基于
TCP
的狭义的RPC
远程调用,以阿里的Dubbo
为代表,主要通过netty
来实现4
层网络协议,NIO
来异步传输,序列化也可以是JSON
或者hessian2
以及java
自带的序列化等,可以配置。
接下来我们主要以第二种的RPC远程调用来自己实现
二、需求与步骤
模仿 dubbo
,消费者和提供者约定接口和协议,消费者远程调用提供者,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信使用 Netty
。
1、创建一个公共的接口项目以及创建接口及方法,用于消费者和提供者之间的约定。
2、创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
3、创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用 Netty
请求提供者返回数据。
三、代码实现
1、maven聚合工程
rpc-netty父pom依赖:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.1.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.riemann</groupId>
<artifactId>rpc-netty</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<properties>
<java.version>1.8</java.version>
</properties>
<modules>
<module>rpc-common</module>
<module>rpc-provider</module>
<module>rpc-consumer</module>
</modules>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.16.Final</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.41</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
rpc-common pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rpc-netty</artifactId>
<groupId>com.riemann</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rpc-common</artifactId>
</project>
rpc-provider pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rpc-netty</artifactId>
<groupId>com.riemann</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rpc-provider</artifactId>
<dependencies>
<dependency>
<groupId>com.riemann</groupId>
<artifactId>rpc-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
rpc-consumer pom依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>rpc-netty</artifactId>
<groupId>com.riemann</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>rpc-consumer</artifactId>
<dependencies>
<dependency>
<groupId>com.riemann</groupId>
<artifactId>rpc-common</artifactId>
<version>1.0-SNAPSHOT</version>
</dependency>
</dependencies>
</project>
2、rpc-common
提供者及消费者工程都需依赖公共模块,这样提供者来实现接口并且提供网络调用,消费者直接通过接口来进行TCP通信及一定的协议定制获取提供者的实现返回值。
public interface IUserService {
String sayHello(String msg);
}
根据RpcRequest
实体作为通信协议
@Data
public class RpcRequest {
/**
* 请求对象的ID
*/
private String requestId;
/**
* 类名
*/
private String className;
/**
* 方法名
*/
private String methodName;
/**
* 参数类型
*/
private Class<?>[] parameterTypes;
/**
* 入参
*/
private Object[] parameters;
}
采用JSON
的方式,定义JSONSerializer
的实现类。
public class JSONSerializer implements Serializer {
public byte[] serialize(Object object) throws IOException {
return JSON.toJSONBytes(object);
}
public <T> T deserialize(Class<T> clazz, byte[] bytes) throws IOException {
return JSON.parseObject(bytes, clazz);
}
}
3、rpc-provider
首先是接口的实现,这一点和普通接口实现是一样的
@Service
public class UserServiceImpl implements IUserService {
// 将来客户端要远程调用的方法
public String sayHello(String msg) {
System.out.println("hello " + msg);
return "success";
}
// 创建一个方法启动服务器
public static void startServer(String ip, int port) throws InterruptedException {
// 1.创建两个线程池对象
NioEventLoopGroup bossGroup = new NioEventLoopGroup();
NioEventLoopGroup workGroup = new NioEventLoopGroup();
// 2.创建服务端的启动引导对象
ServerBootstrap serverBootstrap = new ServerBootstrap();
// 3.配置启动引导对象
serverBootstrap.group(bossGroup, workGroup)
// 设置通道为NIO
.channel(NioServerSocketChannel.class)
// 创建监听channel
.childHandler(new ChannelInitializer<NioSocketChannel>() {
protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
// 获取管道对象
ChannelPipeline pipeline = nioSocketChannel.pipeline();
// 给管道对象pipLine 设置编码
// pipeline.addLast(new StringEncoder());
// pipeline.addLast(new StringDecoder());
pipeline.addLast(new StringEncoder());
pipeline.addLast(new RpcDecoder(RpcRequest.class, new JSONSerializer()));
// 把我们自定义的一个ChannelHandler添加到通道中
pipeline.addLast(new UserServiceHandler());
}
});
// 4.绑定端口
serverBootstrap.bind(ip, port).sync();
System.out.println("rpc-provider已启动...");
}
}
在实现中加入了netty
的服务器启动程序,上面的代码中添加了String
类型的编解码 handler
,添加了一个自定义 handler
。
自定义 handler
逻辑如下:
/**
* 自定义的业务处理器
*/
@Component
public class UserServiceHandler extends ChannelInboundHandlerAdapter {
// 当客户端读取数据时,该方法会被调用
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("接收到客户端信息:" + JSON.toJSON(msg).toString());
// 将读到的msg对象,强转成RpcRequest对象
RpcRequest rpcRequest = (RpcRequest) msg;
// 加载class文件
Class<?> clazz = Class.forName(rpcRequest.getClassName());
// 通过class获取服务器spring容器中service类的实例化bean对象
Object serviceBean = SpringContextUtil.getBean(clazz);
Method method = clazz.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
method.invoke(serviceBean, rpcRequest.getParameters());
//服务器写入数据,将结果返回给客户端
ctx.writeAndFlush("success");
// 注意:客户端将来发送请求的时候会传递一个参数:UserService#sayHello#riemann
// 1.判断当前的请求是否符合规则
/*if (msg.toString().startsWith("UserService")) {
// 2.如何符合规则,调用实现类获取到一个result
UserServiceImpl userService = new UserServiceImpl();
String result = userService.sayHello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
// 3.将调用实现类的方法获得的结果写到客户端
ctx.writeAndFlush(result);
}*/
}
}
还需要一个启动类:
@SpringBootApplication
public class ServerBoot {
public static void main(String[] args) throws InterruptedException {
SpringApplication.run(ServerBoot.class, args);
// 启动服务器
UserServiceImpl.startServer("127.0.0.1", 8999);
}
}
4、rpc-consumer
消费者有一个需要注意的地方,就是调用需要透明,也就是说,框架使用者不用关心底层的网络实现。这里我们可以使用 JDK
的动态代理
来实现这个目的。
思路:客户端调用代理方法,返回一个实现了 HelloService 接口的代理对象,调用代理对象的方法,返回结果。
我们需要在代理中做手脚,当调用代理方法的时候,我们需要初始化 Netty 客户端,还需要向服务端请求数据,并返回数据。
首先创建代理相关的类:
/**
* 消费者
*/
public class RPCConsumer {
// 1.创建一个线程池对象 -- 它要处理我们自定义事件
private static ExecutorService executorService =
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
// 2.声明一个自定义事件处理器 UserClientHandler
private static UserClientHandler userClientHandler;
// 3.编写方法,初始化客户端 (创建连接池、bootStrap、设置bootStrap、连接服务器)
public static void initClient() throws InterruptedException {
// 1) 初始化 userClientHandler
userClientHandler = new UserClientHandler();
// 2) 创建连接池对象
EventLoopGroup group = new NioEventLoopGroup();
// 3) 创建客户端的引导对象
Bootstrap bootstrap = new Bootstrap();
// 4) 配置启动引导对象
bootstrap.group(group)
// 设置通道为NIO
.channel(NioSocketChannel.class)
// 设置请求协议为TCP
.option(ChannelOption.TCP_NODELAY, true)
// 监听channel 并初始化
.handler(new ChannelInitializer<SocketChannel>() {
protected void initChannel(SocketChannel socketChannel) throws Exception {
// 获取 ChannelPipeline
ChannelPipeline pipeline = socketChannel.pipeline();
// 设置编码
// pipeline.addLast(new StringEncoder());
// pipeline.addLast(new StringDecoder());
pipeline.addLast(new RpcEncoder(RpcRequest.class, new JSONSerializer()));
pipeline.addLast(new StringDecoder());
// 添加自定义事件处理器
pipeline.addLast(userClientHandler);
}
});
// 5) 连接服务端
bootstrap.connect("127.0.0.1", 8999).sync();
System.out.println("rpc-consumer已启动...");
}
// 4.编写一个方法,使用JDK的动态代理创建对象
/**
*
* @param serviceClass 接口类型,根据哪个接口生成子类代理对象
* @param providerParam "UserService#sayHello#riemann"
* @return
*/
public static Object createProxy(Class<?> serviceClass, final String providerParam) {
return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
new Class[]{serviceClass}, new InvocationHandler() {
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
// 1.初始化客户端client
if (userClientHandler == null) {
initClient();
}
String requestId = UUID.randomUUID().toString();
String className = method.getDeclaringClass().getName();
String methodName = method.getName();
Class<?>[] parameterTypes = method.getParameterTypes();
RpcRequest rpcRequest = new RpcRequest();
rpcRequest.setRequestId(requestId);
rpcRequest.setClassName(className);
rpcRequest.setMethodName(methodName);
rpcRequest.setParameterTypes(parameterTypes);
rpcRequest.setParameters(args);
// 2.给 userClientHandler 设置param参数
// userClientHandler.setParam(providerParam + args[0]);
userClientHandler.setParam(rpcRequest);
// 3.使用线程池,开启一个线程处理call()写操作,并返回结果。
Object result = executorService.submit(userClientHandler).get();
// 4.return 结果
return result;
}
});
}
}
该类有 2 个方法,创建代理和初始化客户端。
创建代理逻辑
:使用 JDK
的动态代理技术,代理对象中的 invoke
方法实现如下: 如果 client
没有初始化,则初始化 client
,这个 client
既是 handler
,也是一个 Callback
。将参数设置进 client
,使用线程池调用 client
的 call
方法并阻塞等待数据返回。
初始化客户端逻辑
: 创建一个 Netty
的客户端,并连接提供者,并设置一个自定义 handler
,和一些 String
类型的序列化方式。
UserClientHandler
的实现:
/**
* 自定义事件处理器
*/
public class UserClientHandler extends ChannelInboundHandlerAdapter implements Callable {
// 1.定义成员变量
private ChannelHandlerContext context; // 事件处理器上下文对象(存储handler写信息,写操作)
private String result; // 记录服务器返回的数据
private Object param; // 记录将要返回给服务器的数据
// 2.实现 channelActive 客户端和服务器连接时,该方法就自动执行。
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("服务器已连接...");
// 初始化 ChannelHandlerContext
this.context = ctx;
}
// 3.实现 channelRead 当我们读到服务器数据,该方法自动执行。
@Override
public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 将读到的服务器的数据msg,设置为成员变量的值
result = msg.toString();
notify();
}
// 4.将客户端的数据写到服务器
public synchronized Object call() throws Exception {
// context 给服务器写数据
context.writeAndFlush(param);
wait();
return result;
}
// 5.设置参数的方法
public void setParam(Object param) {
this.param = param;
}
}
该类缓存了 ChannelHandlerContext
,用于下次使用,有两个属性:返回结果和请求参数。
当成功连接后,缓存 ChannelHandlerContext
,当调用 call
方法的时候,将请求参数发送到服务端,等待。当服务端收到并返回数据后,调用 channelRead
方法,将返回值赋值个 result
,并唤醒等待在 call
方法上的线程。此时,代理对象返回数据。
再看看消费者调用方式,一般的TCP
的RPC
只需要这样调用即可,无需关心具体的协议和通信方式:
public class ConsumeBoot {
// 参数定义
private static final String PROVIDE_NAME = "UserService#sayHello#";
public static void main(String[] args) throws InterruptedException {
// 1.创建代理对象
IUserService service = (IUserService) RPCConsumer.createProxy(IUserService.class, PROVIDE_NAME);
// 2.循环给服务器写数据
while (true) {
String result = service.sayHello("riemann");
System.out.println(result);
Thread.sleep(2000);
}
}
}
四、结果测试
调用者首先创建了一个代理对象,然后每隔一秒钟调用代理的 sayHello
方法,并打印服务端返回的结果。
服务提供者:
服务消费者:
可以看到,消费者无需通过jar
包的形式引入具体的实现项目,而是通过远程TCP
通信的形式,以一定的协议和代理通过接口直接调用了方法,实现远程service
间的调用,是分布式服务的基础。
五、代码仓库
https://github.com/riemannChow/perseverance/tree/master/handwriting-framework/rpc-netty
本文地址:https://blog.csdn.net/riemann_/article/details/107350656
上一篇: Linux前世今生