欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

手写RPC框架

程序员文章站 2022-07-12 16:38:25
...

在分析RMI原理一文中,我们知道RMI是通过底层封装TCP网络通信实现。
基于此思路本文从以下切入点实现一个简单的RPC框架,反之也促进了对RMI的理解,相辅相成。

服务端

服务端通过端口发布一个服务,监听端口请求,通过反射调用本地方法,并返回结果给调用方。
1、定义要发布的接口和接口实现类

public interface IHello {
    String say();
}

public class IHelloImpl implements IHello {
    @Override
    public String say() {
        return "Hello";
    }
}

public class ServerDemo {
    public static void main(String[] args) {
        IHello iHello = new IHelloImpl();
        RpcServer rpcServer = new RpcServer();
        rpcServer.publisher(iHello,8080);
    }
}

2、定义发布中心,通过线程池来监听请求

public class RpcServer {
    //创建线程池
    private static final ExecutorService executorService=Executors.newCachedThreadPool();

    public void publisher(Object service,Integer port){
        try {
            ServerSocket serverSocket = new ServerSocket(port);
            //循环监听
            while (true){
                Socket socket = serverSocket.accept();
                executorService.submit(new ProcessorHandler(socket,service));
            }

        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

3、处理请求,通过反射得到结果,并将结果传递给客户端

public class ProcessorHandler implements Runnable {
    private Socket socket;

    private Object service;

    public ProcessorHandler(Socket socket, Object service) {
        this.socket = socket;
        this.service = service;
    }

    @Override
    public void run(){
        try {
            ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
            RpcRequest rpcRequest = (RpcRequest) objectInputStream.readObject();
            Object result = invoke(rpcRequest);

            ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
            objectOutputStream.writeObject(result);
            objectOutputStream.flush();
            objectOutputStream.close();
            objectInputStream.close();

        } catch (Exception e) {
            e.printStackTrace();
        }
    }

 private Object invoke(RpcRequest rpcRequest) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
        Object[] args = rpcRequest.getParams();
        Class[] types = null;
        if(args != null){
            types = new Class[args.length];
            for(int i=0 ; i<args.length;i++){
                types[i] = args[i].getClass();
            }
        }

        Method method = service.getClass().getMethod(rpcRequest.getMethod(),types);
        return method.invoke(service,args);
    }
}

客户端

寻找服务,发送请求,得到结果。
1、定义代理,通过代理调用服务端接口

public class RpcClientProxy {

    public <T> T clientProxy(Class<T> target,String host,Integer port){
        return (T)Proxy.newProxyInstance(target.getClassLoader(),new Class[]{target},new RemoteInvocationHandler(host,port));
    }
}

public class RemoteInvocationHandler implements InvocationHandler {
    public String host;

    public Integer port;

    public RemoteInvocationHandler(String host, Integer port) {
        this.host = host;
        this.port = port;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
       RpcRequest rpcRequest = new RpcRequest();
       rpcRequest.setName(method.getDeclaringClass().getName());
       rpcRequest.setMethod(method.getName());
       rpcRequest.setParams(args);
       TCPTransport tcpTransport = new TCPTransport(host,port);
       return tcpTransport.send(rpcRequest);
    }

2、封装网络通信过程

public class TCPTransport {
    public String host;

    public Integer port;

    public TCPTransport(String host, Integer port) {
        this.host = host;
        this.port = port;
    }
    private Socket newSocket(){
        try {
            return new Socket(host,port);
        } catch (IOException e) {
            throw new RuntimeException("连接建立失败");
        }
    }

    public Object send(RpcRequest rpcRequest){
        Socket socket = newSocket();
        //发送请求到服务端
        try {
            ObjectOutputStream objectOutputStream = new ObjectOutputStream(socket.getOutputStream());
            objectOutputStream.writeObject(rpcRequest);
            objectOutputStream.flush();
            ObjectInputStream objectInputStream = new ObjectInputStream(socket.getInputStream());
            Object result = objectInputStream.readObject();
            objectInputStream.close();
            objectOutputStream.close();
            return result;

        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException("发起远程调用异常",e);
        } finally {
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}


public class ClientDemo {
    public static void main(String[] args) {
        RpcClientProxy rpcClientProxy = new RpcClientProxy();
        IHello iHello =  rpcClientProxy.clientProxy(IHello.class,"localhost",8080);
        System.out.println(iHello.say());

    }
}