欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

基于Netty自定义RPC

程序员文章站 2022-06-28 08:19:56
一、基于Netty自定义RPCRPC又称远程过程调用,我们所知的远程调用分为两种,现在在服务间通信的方式也基本以这两种为主:是基于HTTP的restful形式的广义远程调用,以spring could的feign和restTemplate为代表,采用的协议是HTTP的7层调用协议,并且协议的参数和响应序列化基本以JSON格式和XML格式为主。是基于TCP的狭义的RPC远程调用,以阿里的Dubbo为代表,主要通过netty来实现4层网络协议,NIO来异步传输,序列化也可以是JSON或者hessian2...

一、基于Netty自定义RPC

RPC又称远程过程调用,我们所知的远程调用分为两种,现在在服务间通信的方式也基本以这两种为主:

  1. 是基于HTTP的Restful形式的广义远程调用,以Spring Cloud的Feign和RestTemplate为代表,采用的协议是HTTP的7层调用协议,并且协议的参数和响应序列化基本以JSON格式和XML格式为主。
  2. 是基于TCP的狭义的RPC远程调用,以阿里的Dubbo为代表,主要通过Netty来实现4层网络协议,NIO来异步传输,序列化也可以是JSON或者hessian2以及java自带的序列化等,可以配置。

接下来我们主要以第二种的RPC远程调用来自己实现

需求:
  模仿 dubbo,消费者和提供者约定接口和协议,消费者远程调用提供者,提供者返回一个字符串,消费者打印提供者返回的数据。底层网络通信使用 Netty

步骤:

  1. 创建一个公共的接口项目以及创建接口及方法,用于消费者和提供者之间的约定。
  2. 创建一个提供者,该类需要监听消费者的请求,并按照约定返回数据。
  3. 创建一个消费者,该类需要透明的调用自己不存在的方法,内部需要使用 Netty 请求提供者返回数据

1.1 公共模块

首先,在公共模块中添加netty的maven依赖

<dependency>
	<groupId>io.netty</groupId>
	<artifactId>netty-all</artifactId>
	<version>4.1.16.Final</version>
</dependency>

  提供者及消费者工程都需依赖公共模块,这样提供者来实现接口并且提供网络调用,消费者直接通过接口来进行TCP通信及一定的协议定制获取提供者的实现返回值。

接口的定义

public interface IUserService {
    String sayHello(String msg);
}

  只是一个普通的接口,参数是支持序列化的String类型,返回值同理

1.2 提供者的实现

1、首先是接口的实现,这一点和普通接口实现是一样的

package com.cyd.service;

import com.cyd.handler.UserServiceHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

public class UserServiceImpl implements IUserService {

    public String sayHello(String msg) {
        System.out.println("调用成功--参数:" + msg);
        return "调用成功--参数:" + msg;
    }

    /**
     * 启动服务器
     *
     * @param ip   服务器IP
     * @param port 服务器端口
     * @throws InterruptedException
     */
    public static void startServer(String ip, int port) throws InterruptedException {
        // 1、创建 NioEventLoopGroup的两个实例:bossGroup workerGroup
        // bossGroup接收客户端传过来的请求
        // workerGroup处理请求
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        NioEventLoopGroup workGroup = new NioEventLoopGroup();

        // 2、创建服务启动引导类:组装一些必要的组件
        ServerBootstrap serverBootstrap = new ServerBootstrap();

        // 3、配置启动引导类
        // 设置组,第一个bossGroup负责连接,workerGroup负责连接之后的io处理
        serverBootstrap.group(bossGroup, workGroup)
                // channel方法指定服务器监听的通道类型
                .channel(NioServerSocketChannel.class)
                // 设置channel handler,每一个客户端连接后,给定一个监听器进行处理
                .childHandler(new ChannelInitializer<NioSocketChannel>() {
                    protected void initChannel(NioSocketChannel nioSocketChannel) throws Exception {
                        // 获取管道对象
                        ChannelPipeline pipeline = nioSocketChannel.pipeline();
                        // 给管道对象pipeline设置编码
                        pipeline.addLast(new StringEncoder());
                        pipeline.addLast(new StringDecoder());
                        // 把自定义的ChannelHandler添加到通道中
                        pipeline.addLast(new UserServiceHandler());
                    }
                });

        // 4、绑定IP地址以及端口
        serverBootstrap.bind(ip, port).sync();
    }
}

  在实现中加入了netty的服务器启动程序,上面的代码中添加了 String类型的编解码 handler,添加了一个自定义 handler

2、自定义 handler 逻辑如下:

package com.cyd.handler;

import com.cyd.service.UserServiceImpl;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * 自定义业务处理器
 */
public class UserServiceHandler extends ChannelInboundHandlerAdapter {

    /**
     * 当客户端读取数据时,该方法会被调用
     *
     * @param ctx
     * @param msg 客户端发送请求的时候传递一个参数
     * @throws Exception
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        // 注意:客户端将来发送请求的时候传递一个参数:UserService#sayHello#are you ok
        // 1、判断请求是否符合规则
        if (msg.toString().startsWith("UserService")) {
            // 2、如果符合规则,则调用方法并获取result
            UserServiceImpl userService = new UserServiceImpl();
            String result = userService.sayHello(msg.toString().substring(msg.toString().lastIndexOf("#") + 1));
            // 3、把结果result返回到客户端
            ctx.writeAndFlush(result);
        }
    }
}

  这里显示判断了是否符合约定(并没有使用复杂的协议,只是一个字符串判断),然后创建一个具体实现类,并调用方法写回客户端。

3、还需要一个启动类:

public class ServerBoot {

    public static void main(String[] args) {
        try {
            UserServiceImpl.startServer("127.0.0.1", 9999);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }  
}

  关于提供者的代码就写完了,主要就是创建一个 netty 服务端,实现一个自定义的 handler,自定义 handler 判断是否符合之间的约定(协议),如果符合,就创建一个接口的实现类,并调用他的方法返回字符串。

1.3 消费者相关实现

  消费者有一个需要注意的地方,就是调用需要透明,也就是说,框架使用者不用关心底层的网络实现。这里我们可以使用 JDK 的动态代理来实现这个目的。

  思路:客户端调用代理方法,返回一个实现了 IUserService 接口的代理对象,调用代理对象的方法,返回结果。

  我们需要在代理中做手脚,当调用代理方法的时候,我们需要初始化 Netty 客户端,还需要向服务端请求数据,并返回数据。

首先创建代理相关的类:

package com.cyd.client;

import com.cyd.handler.UserClientHandler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class RpcConsumer {

    // 1、创建一个线程池
    public static ExecutorService executorService =
            Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());

    // 2、声明一个自定义事件处理器 UserClientHandler
    private static UserClientHandler userClientHandler;

    // 3、初始化客户端(创建连接池、创建启动引导类、配置引导类、连接服务器)
    public static void initClient() throws InterruptedException {
        // 1) 初始化UserClientHandler
        userClientHandler = new UserClientHandler();
        // 2) 创建连接池对象
        NioEventLoopGroup bossGroup = new NioEventLoopGroup();
        // 3) 创建启动引导类
        Bootstrap bootstrap = new Bootstrap();
        // 4) 配置启动引导类
        bootstrap.group(bossGroup)
                // 设置通道类型为NIO
                .channel(NioSocketChannel.class)
                // 设置请求协议为TCP
                .option(ChannelOption.TCP_NODELAY, true)
                // 监听channel,并初始化
                .handler(new ChannelInitializer<SocketChannel>() {
                    protected void initChannel(SocketChannel socketChannel) throws Exception {
                        // 获取管道Pipeline
                        ChannelPipeline pipeline = socketChannel.pipeline();
                        // 设置编码
                        pipeline.addLast(new StringEncoder());
                        pipeline.addLast(new StringDecoder());
                        // 设置自定义事件处理器
                        pipeline.addLast(userClientHandler);
                    }
                });

        // 5) 连接服务器
        bootstrap.connect("127.0.0.1", 9999).sync();
    }

    /**
     * 4、编写一个方法,使用JDK动态代理创建对象
     *
     * @param serviceClass  接口类型,根据哪个接口生成子类代理对象
     * @param providerParam "UserService#sayHello#"
     * @return
     */
    public static Object createProxy(Class<?> serviceClass, final String providerParam) {
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),
                new Class[]{serviceClass}, new InvocationHandler() {
                    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                        // 1) 初始化客户端
                        if (userClientHandler == null) {
                            initClient();
                        }

                        // 2) 给UserClientHandler设置参数
                        userClientHandler.setParams(providerParam + args[0]);

                        // 3) 使用线程池执行UserClientHandler任务,并返回结果
                        Future future = executorService.submit(userClientHandler);
                        Object result = future.get();

                        // 4) 返回结果
                        return result;
                    }
                });
    }
}

该类有 2 个方法,创建代理和初始化客户端。

  创建代理逻辑:使用 JDK 的动态代理技术,代理对象中的 invoke 方法实现如下: 如果 client 没有初始化,则初始化 client,这个 client 既是 handler ,也是一个 Callable 。将参数设置进 client ,使用线程池调用 client 的 call 方法并阻塞等待数据返回。

  初始化客户端逻辑: 创建一个 Netty 的客户端,并连接提供者,并设置一个自定义 handler,和一些 String 类型的序列化方式。

UserClientHandler 的实现:

package com.cyd.handler;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.util.concurrent.Callable;

/**
 * 自定义事件处理器
 */
public class UserClientHandler extends ChannelInboundHandlerAdapter implements Callable {

    // 1、定义成员变量
    // 事件处理器上下文对象(存储handler信息,进行写操作)
    private ChannelHandlerContext channelHandlerContext;
    // 记录服务器返回的数据
    private String result;
    // 记录将要发送给服务器的数据
    private String params;

    // 2、实现channelActive方法,客户端和服务器建立连接时该方法自动执行
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        // 初始化ChannelHandlerContext
        channelHandlerContext = ctx;
    }

    // 3、实现channelRead方法,当我们读取到服务器数据,该方法自动执行
    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        result = msg.toString();
        notify();
    }

    // 4、将客户端的数据写到服务器
    public synchronized Object call() throws Exception {
        // channelHandlerContext给服务器写数据
        channelHandlerContext.writeAndFlush(params);
        wait();
        return result;
    }

    // 5、设置客户端请求参数
    public void setParams(String params) {
        this.params = params;
    }
}

  该类缓存了 ChannelHandlerContext,用于下次使用,有两个属性:返回结果和请求参数。

  当成功连接后,缓存 ChannelHandlerContext,当调用 call 方法的时候,将请求参数发送到服务端,等待。当服务端收到并返回数据后,调用 channelRead 方法,将返回值赋值个 result,并唤醒等待在 call 方法上的线程。此时,代理对象返回数据。

  再看看消费者调用方式,一般的TCP的RPC只需要这样调用即可,无需关心具体的协议和通信方式:

package com.cyd.boot;

import com.cyd.client.RpcConsumer;
import com.cyd.service.IUserService;

public class ConsumerBoot {

    private static final String PROVIDER_PARAM = "UserService#sayHello#";

    public static void main(String[] args) throws InterruptedException {
        // 1、创建代理对象
        IUserService userService = (IUserService) RpcConsumer.createProxy(IUserService.class, PROVIDER_PARAM);

        // 2、循环给服务器写数据
        while (true) {
            String result = userService.sayHello("are you ok!");
            System.out.println("服务器应答------->>>" + result);
            Thread.sleep(2000);
        }
    }
}

  调用者首先创建了一个代理对象,然后每隔一秒钟调用代理的 sayHello 方法,并打印服务端返回的结果

  可以看到,消费者无需通过jar包的形式引入具体的实现项目,而是通过远程TCP通信的形式,以一定的协议和代理通过接口直接调用了方法,实现远程service间的调用,是分布式服务的基础。

本文地址:https://blog.csdn.net/cyd_0619/article/details/110082941

相关标签: rpc netty