欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

手把手教你用Netty实现一个RPC框架

程序员文章站 2022-03-20 23:00:34
写在开头本文使用Netty简单实现一个RPC框架,包括服务端,客户端,注册中心等,暂时不考虑监控,并且因为使用的Netty,所以使用到了Netty的封装API,所以不熟悉NettyAPI的小伙伴可以先熟悉一下API,我在代码中也注释了相关步骤和逻辑,因为Neety其实就是对网络通信的封装框架,所以底层还是IO那一套。建议大家熟悉一下NIO的三件套,buffer,selector,channel。项目结构api 服务接口consumer 消费者protocol 策略对象provi....

写在开头

本文使用Netty简单实现一个RPC框架,包括服务端,客户端,注册中心等,暂时不考虑监控,并且因为使用的Netty,所以使用到了Netty的封装API,所以不熟悉NettyAPI的小伙伴可以先熟悉一下API,我在代码中也注释了相关步骤和逻辑,因为Netty其实就是对网络通信的封装框架,所以底层还是IO那一套。建议大家熟悉一下NIO的三件套,buffer,selector,channel。

项目结构

手把手教你用Netty实现一个RPC框架

  • api  服务接口
  • consumer 消费者
  • protocol 策略对象
  • provider 生产者-api接口服务实现
  • registry 注册中心 

api

/**
 * @author : Ls
 * @ClassName : IRpcService
 * @date : 2021-01-04 15:46
 * @Version : 1.0
 * @Description : RPC基础服务接口
 **/
public interface IRpcDoSomethingService {

    int add(int a,int b);
    int sub(int a,int b);
    int mul(int a,int b);
    int div(int a,int b);

}
/**
 * @author : Ls
 * @ClassName : IRpcHelloService
 * @date : 2021-01-04 15:45
 * @Version : 1.0
 * @Description : Rpc基础服务接口
 **/
public interface IRpcSayHelloService {

    String sayHello(String name);

}

provider

/**
 * @author : Ls
 * @ClassName : IRpcDoSomethingServiceImpl
 * @date : 2021-01-04 15:50
 * @Version : 1.0
 * @Description :
 **/
public class IRpcDoSomethingServiceImpl implements IRpcDoSomethingService {
    public int add(int a, int b) {
        return a + b;
    }

    public int sub(int a, int b) {
        return a - b;
    }

    public int mul(int a, int b) {
        return a * b;
    }

    public int div(int a, int b) {
        return a / b;
    }
}
/**
 * @author : Ls
 * @ClassName : IRpcHelloServiceImpl
 * @date : 2021-01-04 15:47
 * @Version : 1.0
 * @Description :
 **/
public class IRpcSayHelloServiceImpl implements IRpcSayHelloService {

    public String sayHello(String name) {
        return  "Hello "+name+" !!" ;
    }

}

procotol

/**
 * 自定义传输协议
 */
@Data
public class MyInvokerProtocol implements Serializable {

    private String className;//类名
    private String methodName;//函数名称 
    private Class<?>[] parames;//形参列表
    private Object[] values;//实参列表

}

registry

/**
 * @author : Ls
 * @ClassName : IRpcRegistry
 * @date : 2021-01-04 15:52
 * @Version : 1.0
 * @Description : 注册中心
 **/
public class IRpcRegistry {

    // 暴露端口
    private  int port;

    public IRpcRegistry(int port){
        this.port = port;
    }

    /**
     * 开启服务接口监听
     * 调用 netty 相关API实现接口监听
     */
    public void listen(){
        // Boss线程 (Selector核心)
        NioEventLoopGroup boss = new NioEventLoopGroup();
        // Work线程 (工作线程)
        NioEventLoopGroup work = new NioEventLoopGroup();

        // 1. 建立服务
        ServerBootstrap server = new ServerBootstrap();
        // 2. 注入 Boos/Worker
        server.group(boss,work)
                .channel(NioServerSocketChannel.class) // 3. 管道执行 keys 轮询的核心
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    protected void initChannel(SocketChannel channel) throws Exception {
                        // 5. 对流数据进行解析
                        ChannelPipeline pipeline = channel.pipeline();
                        // 6. 自定义协议解码器 (取决于自己定义的规则对象)
                        /** 入参有5个,分别解释如下
                         *  maxFrameLength:框架的最大长度。如果帧的长度大于此值,则将抛出TooLongFrameException。
                         *  lengthFieldOffset:长度字段的偏移量:即对应的长度字段在整个消息数据中得位置
                         *  lengthFieldLength:长度字段的长度。如:长度字段是int型表示,那么这个值就是4(long型就是8)
                         *  lengthAdjustment:要添加到长度字段值的补偿值
                         *  initialBytesToStrip:从解码帧中去除的第一个字节数
                         */
                        pipeline.addLast(new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                        //自定义协议编码器
                        pipeline.addLast(new LengthFieldPrepender(4));
                        // 7. 参数解析
                        //对象参数类型编码器
                        pipeline.addLast("encoder",new ObjectEncoder());
                        //对象参数类型解码器
                        pipeline.addLast("decoder",new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                        // 8. 执行业务逻辑
                        pipeline.addLast(new IRegistryHandler());
                    }
                }) // 4. 子线程 执行对应的业务逻辑
                .option(ChannelOption.SO_BACKLOG,128) // 主线程最大连接数
                .childOption(ChannelOption.SO_KEEPALIVE,true); // 子线程持续

        try {
            // 服务绑定端口
            ChannelFuture future = server.bind(port).sync();
            System.out.println("RPC start success, listen port is :" +  port + " !!");
            future.channel().closeFuture().sync(); // 回调
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            boss.shutdownGracefully();
            work.shutdownGracefully();
        }
    }

    
    public static void main(String[] args) {
        // 监听启动
        new IRpcRegistry(8080).listen();
    }


}
/**
 * @author : Ls
 * @ClassName : IRegistryHandler
 * @date : 2021-01-04 16:19
 * @Version : 1.0
 * @Description :  业务执行
 **/
public class IRegistryHandler extends ChannelInboundHandlerAdapter {

    // 注册中心 (容器)
    public static ConcurrentHashMap<String,Object> context = new ConcurrentHashMap<String,Object>();
    // 类信息集合
    private List<String> classNames = new ArrayList<String>();

    // 构造器初始化
    public IRegistryHandler(){
        // 包扫描信息
        scannerClass("com.lishuo.netty.rpc.provider");
        // 注册容器
        doRegistry();
    }


    // 递归包扫描
    private void scannerClass(String packageName) {
        // 获取类加载器
        URL url = this.getClass().getClassLoader().getResource(packageName.replaceAll("\\.", "/"));
        File dir = new File(url.getFile());
        for (File file : dir.listFiles()) {
            //如果是一个文件夹,继续递归
            if(file.isDirectory()){
                scannerClass(packageName + "." + file.getName());
            }else{
                classNames.add(packageName + "." + file.getName().replace(".class", "").trim());
            }
        }
    }

    // 注册
    private void doRegistry() {
        if(classNames.isEmpty()){return;}
        for (String className : classNames) {
            try {
                Class<?> clazz = Class.forName(className);
                Class<?> anInterface = clazz.getInterfaces()[0]; // 因为此demo默认实现一个接口  所以此处写死获取当前第一个接口信息
                context.put(anInterface.getName(),clazz.newInstance());
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }

    // 请求到达  执行执行的业务逻辑
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        Object result = new Object();
        MyInvokerProtocol request = (MyInvokerProtocol) msg; // netty 会按照我们自定义的策略进行转换
        // 判断当前调用服务在容器中是否真正存在
        if(context.containsKey(request.getClassName())){
            // 确实存在执行对应的业务逻辑
            Object clazz = context.get(request.getClassName());
            // 获取真正执行的
            Method method = clazz.getClass().getMethod(request.getMethodName(), request.getParames());
            result = method.invoke(clazz, request.getValues());
        }
        if(result != null){
            ctx.write(result);
        }
        ctx.flush();
        ctx.close();
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        cause.printStackTrace();
        ctx.close();
    }


}

consumer

/**
 * @author : Ls
 * @ClassName : IRpcConsumer
 * @date : 2021-01-04 16:58
 * @Version : 1.0
 * @Description :
 **/
public class IRpcConsumer {

    public static void main(String[] args) {
        IRpcSayHelloService invoke = IRpcProxy.invoke(IRpcSayHelloService.class);
        System.out.println(invoke.sayHello("netty"));

        IRpcDoSomethingService invoke1 = IRpcProxy.invoke(IRpcDoSomethingService.class);
        System.out.println(invoke1.add(2,4));
        System.out.println(invoke1.mul(2,4));
        System.out.println(invoke1.sub(2,4));
        System.out.println(invoke1.div(2,4));

    }

}
/**
 * @author : Ls
 * @ClassName : IRpcProxy
 * @date : 2021-01-04 16:59
 * @Version : 1.0
 * @Description :
 **/
public class IRpcProxy {

    public static <T> T invoke(Class<T> clazz){
        Class<?> [] interfaces = clazz.isInterface() ?
                new Class[]{clazz} :
                clazz.getInterfaces();
       T result = (T) Proxy.newProxyInstance(clazz.getClassLoader(),interfaces,new ConsumerProxyHandler(clazz));
       return result;
    }

}
/**
 * @author : Ls
 * @ClassName : ConsumerProxyHandler
 * @date : 2021-01-04 17:09
 * @Version : 1.0
 * @Description :
 **/
public class ConsumerProxyHandler implements InvocationHandler {

    private Class<?> clazz;
    public ConsumerProxyHandler(Class<?> clazz){
        this.clazz = clazz;
    }

    // 动态代理执行相应业务逻辑
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
        if(Object.class.equals(method.getDeclaringClass())){
            // 如果当前就是一个实现类
            return method.invoke(proxy,args);
        }else{
            return rpcInvoke(method,args);
        }
    }

    private Object rpcInvoke(Method method, Object[] args) {
        MyInvokerProtocol request = new MyInvokerProtocol();
        request.setClassName(this.clazz.getName()); // 类名称
        request.setMethodName(method.getName()); // 方法名称
        request.setParames(method.getParameterTypes()); // 入参列表
        request.setValues(args); // 实参列表

        // TCP 远程调用
        final IRpcProxyHandler consumerHandler = new IRpcProxyHandler();

        NioEventLoopGroup work = new NioEventLoopGroup();

        Bootstrap server = new Bootstrap();
        server.group(work)
                .channel(NioSocketChannel.class)// 客户端管道
                .option(ChannelOption.TCP_NODELAY, true) // 开启
                .handler(new ChannelInitializer<SocketChannel>() {
                    protected void initChannel(SocketChannel ch) throws Exception {
                        ChannelPipeline pipeline = ch.pipeline();
                        pipeline.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, 4, 0, 4));
                        //自定义协议编码器
                        pipeline.addLast("frameEncoder", new LengthFieldPrepender(4));
                        //对象参数类型编码器
                        pipeline.addLast("encoder", new ObjectEncoder());
                        //对象参数类型解码器
                        pipeline.addLast("decoder", new ObjectDecoder(Integer.MAX_VALUE, ClassResolvers.cacheDisabled(null)));
                        pipeline.addLast("handler",consumerHandler);
                    }
                });
        ChannelFuture future = null;
        try {
            future = server.connect("localhost", 8080).sync();
            future.channel().writeAndFlush(request).sync();
            future.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            work.shutdownGracefully();
        }
        return consumerHandler.getResponse();
    }

}
/**
 * @author : Ls
 * @ClassName : IRpcProxyHandler
 * @date : 2021-01-04 17:18
 * @Version : 1.0
 * @Description :
 **/
public class IRpcProxyHandler extends ChannelInboundHandlerAdapter {

    private Object response;

    public Object getResponse() {
        return response;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        response = msg;
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        super.exceptionCaught(ctx, cause);
    }
}

 

本文地址:https://blog.csdn.net/ls490447406/article/details/112230244