欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

荐 我的架构梦:(二十)基于Netty手写RPC框架

程序员文章站 2022-04-02 11:24:50
基于Netty手写RPC框架一、前言二、需求与步骤三、代码实现四、结果测试五、代码仓库一、前言RPC又称远程过程调用,我们所知的远程调用分为两种,现在在服务间通信的方式也基本以这两种为主:是基于HTTP的restful形式的广义远程调用,以spring could的feign和restTemplate为代表,采用的协议是HTTP 的7层调用协议,并且协议的参数和响应序列化基本以JSON格式和XML格式为主。是基于TCP的狭义的RPC远程调用,以阿里的Dubbo为代表,主要通过netty来实现4层网...

一、前言

RPC又称远程过程调用,我们所知的远程调用分为两种,现在在服务间通信的方式也基本以这两种为主:

  • 是基于HTTPrestful形式的广义远程调用,以spring couldfeignrestTemplate为代表,采用的协议是HTTP7层调用协议,并且协议的参数和响应序列化基本以JSON格式和XML格式为主。
  • 是基于TCP的狭义的RPC远程调用,以阿里的Dubbo为代表,主要通过netty来实现4层网络协议,NIO来异步传输,序列化也可以是JSON或者hessian2以及java自带的序列化等,可以配置。

接下来我们主要以第二种的RPC远程调用来自己实现

二、需求与步骤

模仿 dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信使用 Netty

1、创建一个公共的接口项目以及创建接口及方法,用于消费者和提供者之间的约定。
2、创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
3、创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用 Netty 请求提供者返回数据。

三、代码实现

1、maven聚合工程

rpc-netty父pom依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.1.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.riemann</groupId>
    <artifactId>rpc-netty</artifactId>
    <packaging>pom</packaging>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <java.version>1.8</java.version>
    </properties>

    <modules>
        <module>rpc-common</module>
        <module>rpc-provider</module>
        <module>rpc-consumer</module>
    </modules>

    <dependencies>
        <dependency>
            <groupId>io.netty</groupId>
            <artifactId>netty-all</artifactId>
            <version>4.1.16.Final</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.12</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.41</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

rpc-common pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>rpc-netty</artifactId>
        <groupId>com.riemann</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>rpc-common</artifactId>
    
</project>

rpc-provider pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>rpc-netty</artifactId>
        <groupId>com.riemann</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>rpc-provider</artifactId>

    <dependencies>
        <dependency>
            <groupId>com.riemann</groupId>
            <artifactId>rpc-common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>

</project>

rpc-consumer pom依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>rpc-netty</artifactId>
        <groupId>com.riemann</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>rpc-consumer</artifactId>

    <dependencies>
        <dependency>
            <groupId>com.riemann</groupId>
            <artifactId>rpc-common</artifactId>
            <version>1.0-SNAPSHOT</version>
        </dependency>
    </dependencies>

</project>

2、rpc-common

提供者及消费者工程都需依赖公共模块,这样提供者来实现接口并且提供网络调用,消费者直接通过接口来进行TCP通信及一定的协议定制获取提供者的实现返回值。

public interface IUserService {

    String sayHello(String msg);

}

根据RpcRequest实体作为通信协议

@Data
public class RpcRequest {

    /**
     * 请求对象的ID
     */
    private String requestId;

    /**
     * 类名
     */
    private String className;

    /**
     * 方法名
     */
    private String methodName;

    /**
     * 参数类型
     */
    private Class<?>[] parameterTypes;

    /**
     * 入参
     */
    private Object[] parameters;

}

采用JSON的方式,定义JSONSerializer的实现类。

public class JSONSerializer implements Serializer {

    public byte[] serialize(Object object) throws IOException {
        return JSON.toJSONBytes(object);
    }

    public <T> T deserialize(Class<T> clazz, byte[] bytes) throws IOException {
        return JSON.parseObject(bytes, clazz);
    }

}

3、rpc-provider

首先是接口的实现,这一点和普通接口实现是一样的

@Service
public class UserServiceImpl implements IUserService {

    // 将来客户端要远程调用的方法
    public String sayHello(String msg) {
        System.out.println("hello " + msg);
        return "success";
    }

    // 创建一个方法启动服务器
    public static void startServer(String ip, int port) throws InterruptedException {
        // 1.创建两个线程池对象
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workGroup = new NioEventLoopGroup();

        // 2.创建服务端的启动引导对象
        ServerBootstrap serverBootstrap = new ServerBootstrap();

        // 3.配置启动引导对象
        serverBootstrap.group(bossGroup, workGroup)
                // 设置通道为NIO
                .channel(NioServerSocketChannel.class)
                // 创建监听channel
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        // 获取管道对象
                        ChannelPipeline pipeline = nioSocketChannel.pipeline();
                        // 给管道对象pipLine 设置编码
                        // pipeline.addLast(new StringEncoder());
                        // pipeline.addLast(new StringDecoder());

                        pipeline.addLast(new StringEncoder());
                        pipeline.addLast(new RpcDecoder(RpcRequest.class, new JSONSerializer()));

                        // 把我们自定义的一个ChannelHandler添加到通道中
                        pipeline.addLast(new UserServiceHandler());
                    }
                });

        // 4.绑定端口
        serverBootstrap.bind(ip, port).sync();

        System.out.println("rpc-provider已启动...");
    }

}

在实现中加入了netty的服务器启动程序,上面的代码中添加了String类型的编解码 handler,添加了一个自定义 handler

自定义 handler 逻辑如下:

/**
 * 自定义的业务处理器
 */
@Component
public class UserServiceHandler extends ChannelInboundHandlerAdapter {

    // 当客户端读取数据时,该方法会被调用
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("接收到客户端信息:" + JSON.toJSON(msg).toString());
        // 将读到的msg对象,强转成RpcRequest对象
        RpcRequest rpcRequest = (RpcRequest) msg;
        // 加载class文件
        Class<?> clazz = Class.forName(rpcRequest.getClassName());
        // 通过class获取服务器spring容器中service类的实例化bean对象
        Object serviceBean = SpringContextUtil.getBean(clazz);
        Method method = clazz.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
        method.invoke(serviceBean, rpcRequest.getParameters());

        //服务器写入数据,将结果返回给客户端
        ctx.writeAndFlush("success");

        // 注意:客户端将来发送请求的时候会传递一个参数:UserService#sayHello#riemann

        // 1.判断当前的请求是否符合规则
        /*if (msg.toString().startsWith("UserService")) {
            // 2.如何符合规则,调用实现类获取到一个result
            UserServiceImpl userService = new UserServiceImpl();
            String result = userService.sayHello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
            // 3.将调用实现类的方法获得的结果写到客户端
            ctx.writeAndFlush(result);
        }*/
    }

}

还需要一个启动类:

@SpringBootApplication
public class ServerBoot {

    public static void main(String[] args) throws InterruptedException {
        SpringApplication.run(ServerBoot.class, args);
        // 启动服务器
        UserServiceImpl.startServer("127.0.0.1", 8999);
    }

}

4、rpc-consumer

消费者有一个需要注意的地方,就是调用需要透明,也就是说,框架使用者不用关心底层的网络实现。这里我们可以使用 JDK动态代理来实现这个目的。

思路:客户端调用代理方法,返回一个实现了 HelloService 接口的代理对象,调用代理对象的方法,返回结果。

我们需要在代理中做手脚,当调用代理方法的时候,我们需要初始化 Netty 客户端,还需要向服务端请求数据,并返回数据。

首先创建代理相关的类:

/**
 * 消费者
 */
public class RPCConsumer {

    // 1.创建一个线程池对象  -- 它要处理我们自定义事件
    private static ExecutorService executorService =
            Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    // 2.声明一个自定义事件处理器  UserClientHandler
    private static UserClientHandler userClientHandler;

    // 3.编写方法,初始化客户端 (创建连接池、bootStrap、设置bootStrap、连接服务器)
    public static void initClient() throws InterruptedException {
        // 1) 初始化 userClientHandler
        userClientHandler = new UserClientHandler();
        // 2) 创建连接池对象
        EventLoopGroup group = new NioEventLoopGroup();
        // 3) 创建客户端的引导对象
        Bootstrap bootstrap = new Bootstrap();
        // 4) 配置启动引导对象
        bootstrap.group(group)
                // 设置通道为NIO
                .channel(NioSocketChannel.class)
                // 设置请求协议为TCP
                .option(ChannelOption.TCP_NODELAY, true)
                // 监听channel 并初始化
                .handler(new ChannelInitializer<SocketChannel>() {
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // 获取 ChannelPipeline
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        // 设置编码
                        // pipeline.addLast(new StringEncoder());
                        // pipeline.addLast(new StringDecoder());

                        pipeline.addLast(new RpcEncoder(RpcRequest.class, new JSONSerializer()));
                        pipeline.addLast(new StringDecoder());
                        // 添加自定义事件处理器
                        pipeline.addLast(userClientHandler);
                    }
                });
        // 5) 连接服务端
        bootstrap.connect("127.0.0.1", 8999).sync();
        System.out.println("rpc-consumer已启动...");
    }

    // 4.编写一个方法,使用JDK的动态代理创建对象
    /**
     *
     * @param serviceClass   接口类型,根据哪个接口生成子类代理对象
     * @param providerParam  "UserService#sayHello#riemann"
     * @return
     */
    public static Object createProxy(Class<?> serviceClass, final String providerParam) {
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class[]{serviceClass}, new InvocationHandler() {
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        // 1.初始化客户端client
                        if (userClientHandler == null) {
                            initClient();
                        }

                        String requestId = UUID.randomUUID().toString();
                        String className = method.getDeclaringClass().getName();
                        String methodName = method.getName();
                        Class<?>[] parameterTypes = method.getParameterTypes();

                        RpcRequest rpcRequest = new RpcRequest();
                        rpcRequest.setRequestId(requestId);
                        rpcRequest.setClassName(className);
                        rpcRequest.setMethodName(methodName);
                        rpcRequest.setParameterTypes(parameterTypes);
                        rpcRequest.setParameters(args);

                        // 2.给 userClientHandler 设置param参数
                        // userClientHandler.setParam(providerParam + args[0]);
                        userClientHandler.setParam(rpcRequest);

                        // 3.使用线程池,开启一个线程处理call()写操作,并返回结果。
                        Object result = executorService.submit(userClientHandler).get();

                        // 4.return 结果
                        return result;
                    }
                });
    }

}

该类有 2 个方法,创建代理和初始化客户端。

创建代理逻辑:使用 JDK 的动态代理技术,代理对象中的 invoke 方法实现如下: 如果 client 没有初始化,则初始化 client,这个 client 既是 handler ,也是一个 Callback。将参数设置进 client ,使用线程池调用 clientcall 方法并阻塞等待数据返回。

初始化客户端逻辑: 创建一个 Netty 的客户端,并连接提供者,并设置一个自定义 handler,和一些 String 类型的序列化方式。

UserClientHandler 的实现:

/**
 * 自定义事件处理器
 */
public class UserClientHandler extends ChannelInboundHandlerAdapter implements Callable {

    // 1.定义成员变量
    private ChannelHandlerContext context; // 事件处理器上下文对象(存储handler写信息,写操作)
    private String result; // 记录服务器返回的数据
    private Object param; // 记录将要返回给服务器的数据

    // 2.实现 channelActive 客户端和服务器连接时,该方法就自动执行。
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        System.out.println("服务器已连接...");
        // 初始化 ChannelHandlerContext
        this.context = ctx;
    }


    // 3.实现 channelRead 当我们读到服务器数据,该方法自动执行。
    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 将读到的服务器的数据msg,设置为成员变量的值
        result = msg.toString();
        notify();
    }

    // 4.将客户端的数据写到服务器
    public synchronized Object call() throws Exception {
        // context 给服务器写数据
        context.writeAndFlush(param);
        wait();
        return result;
    }

    // 5.设置参数的方法
    public void setParam(Object param) {
        this.param = param;
    }
}

该类缓存了 ChannelHandlerContext,用于下次使用,有两个属性:返回结果和请求参数。

当成功连接后,缓存 ChannelHandlerContext,当调用 call 方法的时候,将请求参数发送到服务端,等待。当服务端收到并返回数据后,调用 channelRead 方法,将返回值赋值个 result,并唤醒等待在 call 方法上的线程。此时,代理对象返回数据。

再看看消费者调用方式,一般的TCPRPC只需要这样调用即可,无需关心具体的协议和通信方式:

public class ConsumeBoot {

    // 参数定义
    private static final String PROVIDE_NAME = "UserService#sayHello#";

    public static void main(String[] args) throws InterruptedException {
        // 1.创建代理对象
        IUserService service = (IUserService) RPCConsumer.createProxy(IUserService.class, PROVIDE_NAME);

        // 2.循环给服务器写数据
        while (true) {
            String result = service.sayHello("riemann");
            System.out.println(result);
            Thread.sleep(2000);
        }
    }

}

四、结果测试

调用者首先创建了一个代理对象,然后每隔一秒钟调用代理的 sayHello 方法,并打印服务端返回的结果。

服务提供者:
荐
                                                        我的架构梦:(二十)基于Netty手写RPC框架
服务消费者:
荐
                                                        我的架构梦:(二十)基于Netty手写RPC框架

可以看到,消费者无需通过jar包的形式引入具体的实现项目,而是通过远程TCP通信的形式,以一定的协议和代理通过接口直接调用了方法,实现远程service间的调用,是分布式服务的基础。

五、代码仓库

https://github.com/riemannChow/perseverance/tree/master/handwriting-framework/rpc-netty

本文地址:https://blog.csdn.net/riemann_/article/details/107350656