欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

RPC调用的简单实现

程序员文章站 2022-07-03 19:57:10
RPC调用流程流程描述:1.服务调用者发送请求(interface#method#args)2.客户端进行StringEncode编码3.数据写到服务提供者4.服务提供者接受请求5.将接收的包进行StringDecode解码6.服务提供方调用对应api7.服务提供方响应方法调用结果8.服务提供方将结果集进行StringEncode编码9.服务提供方发送结果集包到服务调用者10.服务调用者接受数据包11.服务调用者将数据包进行StringDecode解码...

RPC调用流程

RPC调用的简单实现

流程描述:

1.服务调用者发送请求(interface#method#args)

2.客户端进行StringEncode编码

3.数据写到服务提供者

4.服务提供者接受请求

5.将接收的包进行StringDecode解码

6.服务提供方调用对应api

7.服务提供方响应方法调用结果

8.服务提供方将结果集进行StringEncode编码

9.服务提供方发送结果集包到服务调用者

10.服务调用者接受数据包

11.服务调用者将数据包进行StringDecode解码

12.服务调用者输出方法调用结果集

代码流程:

服务端接口:

package com.hx.zbhuang.netty.dubboCall;

/**
 * 远程服务接口
 */
public interface HelloService {
    String hello(String mes);
    String say(String msg);
}

服务端接口实现类:

package com.hx.zbhuang.netty.dubboCall;

/**
 * 远程服务实现类
 */
public class HelloServiceImpl implements HelloService {
    @Override
    public String hello(String msg) {
        if(msg!=null) {
            return "hello 豆腐蛋,i accept you msg:["+msg+"]";
        } else {
            return "hello 豆腐蛋,i accept you msg";
        }
    }

    @Override
    public String say(String msg) {
        return "hello 豆腐蛋" + "==msg:"+msg;
    }
}

服务端初始化:

package com.hx.zbhuang.netty.dubboCall;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;

/**
 * 服务端初始化
 */
public class NettyServer {
    public static void main(String[] args) {
        EventLoopGroup bossGroup = null;
        EventLoopGroup workerGroup = null;
        try {
            workerGroup = new NioEventLoopGroup();
            bossGroup = new NioEventLoopGroup(1);
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(bossGroup,workerGroup).channel(NioServerSocketChannel.class)
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline = socketChannel.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            // 服务端拦水坝处理
                            pipeline.addLast(new NettyServerHandler());
                        }
                    });
            ChannelFuture channelFuture = serverBootstrap.bind(7766).sync();
            channelFuture.channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

服务端数据处理handler

package com.hx.zbhuang.netty.dubboCall;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import java.lang.reflect.Method;

public class NettyServerHandler extends SimpleChannelInboundHandler {
    /**
     * 读取客户端需要调用的方法进行处理返回结果集
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("msg="+msg);
        String methodName = msg.toString().split("#")[1];
        Method[] methods = HelloService.class.getMethods();
        for (Method method:methods) {
            if(method.getName().equals(methodName)) {
                Object obj = method.invoke(new HelloServiceImpl(),msg.toString().substring(msg.toString().lastIndexOf("#")+1));
                ctx.writeAndFlush(obj.toString());
            }
        }
    }

    /**
     * 异常处理
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        ctx.close();
    }
}

客户端接口调用初始化

package com.hx.zbhuang.netty.dubboCall;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import java.lang.reflect.Proxy;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class NettyClient {
    //线程池管理客户端请求
    private static ExecutorService executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    private  static NettyClientHandler client;

    /**
     * 返回服务端接口方法调用结果集
     * @param serviceClass
     * @param interfaceName
     * @return
     */
    Object getBean(final Class<?> serviceClass, final String interfaceName){
        return Proxy.newProxyInstance(Thread.currentThread().getContextClassLoader(),new Class<?>[]{serviceClass},(proxy,method,args) -> {
            if (client ==null) {
                initClient();
            }
            client.setPara(interfaceName+"#"+method.getName()+"#"+args[0]);
            Object obj = executor.submit(client).get();
            return obj;
        });
    }

    /**
     * 客户端初始化
     */
    private static void initClient() {
        client = new NettyClientHandler();
        NioEventLoopGroup group =null;
        try {
            group = new NioEventLoopGroup();
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group).channel(NioSocketChannel.class)
                    .option(ChannelOption.TCP_NODELAY,true)
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel socketChannel) throws Exception {
                            ChannelPipeline pipeline=socketChannel.pipeline();
                            pipeline.addLast(new StringDecoder());
                            pipeline.addLast(new StringEncoder());
                            // 客户端拦水坝处理
                            pipeline.addLast(client);
                        }
                    });
            bootstrap.connect("127.0.0.1",7766).sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

客户端数据处理handler

package com.hx.zbhuang.netty.dubboCall;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.util.concurrent.Callable;

public class NettyClientHandler extends ChannelInboundHandlerAdapter implements Callable {
    // channelHandler上下文
    private ChannelHandlerContext context;
    // 远程服务获取方法调用结果
    private String result;
    // 接口名#方法名#参数
    private String para;

    /**
     * 将请求写到服务端
     * @return
     * @throws Exception
     */
    @Override
    public synchronized Object call() throws Exception {
        context.writeAndFlush(para);
        wait();
        return result;
    }

    /**
     * 获取上下文信息
     * @param ctx
     * @throws Exception
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        context = ctx;
    }

    /**
     * 获取服务端调用结果
     * @param ctx
     * @param msg
     * @throws Exception
     */
    @Override
    public synchronized void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        result = msg.toString();
        notify();
    }

    /**
     * 异常处理
     * @param ctx
     * @param cause
     * @throws Exception
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
       ctx.close();
    }

    void setPara(String para) {
        this.para = para;
    }
}

模拟接口调用:

package com.hx.zbhuang.netty.dubboCall;

public class ClientBootstrap {
    // 暴露的接口名
    public static final String interfaceName = "HelloService";

    public static void main(String[] args) throws InterruptedException {
        NettyClient customer = new NettyClient();
        // 调用远程服务接口
        HelloService service = (HelloService)customer.getBean(HelloService.class, interfaceName);
        String msg = service.say("来打王者");
        System.out.println("调用结果"+ msg);
    }
}

客户端远程调用服务端接口响应:

RPC调用的简单实现

本文地址:https://blog.csdn.net/qq_33554285/article/details/110457528

相关标签: java