欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

手写简单RPC实现

程序员文章站 2022-07-12 16:39:43
...

RPC(远程过程调用)是分布式服务中不可缺少的一部分,那么我们平常用的RPC框架都是别人封装好的,开箱即用,当然仅仅只是使用已经满足不了我的好奇心,现在我们自己来实现一个简单版的RPC框架,目的是了解RPC框架的运行机制和原理,明白了机制那么在看别人封装好的框架时就可以不那么吃力,而且还可以针对自己的业务进行扩展。
首先我们来看看什么是RPC?RPC有什么用?,顾名思义RPC远程过程调用是相对于本地调用来说,本地调用即我们平时在写代码时在同一个JVM中进行方法的调用,所有的方法都是在本地,方法的调用都可以在JVM中找到方法的的地址,但是远程过程调用就无法做到这个事了,调用的方法跨越了JVM,调用者根本不知道远程方法到底在哪,为了解决这个问题RPC就应运而生了,调用者通过把调用的对象,方法,参数发送到服务的提供者,然后服务提供者执行完成后将结果返回,那么调用者接收返回的结果从而完成了一次远程执行过程。那么明白了原理我们来看看怎么通过代码来实现
第一步:定义远程服务的接口

/**
 * Created by apple on 2019/3/31.
 */
public interface HelloService {


    void sayHello(String name);


    String drive(String name);

}

第二步:实现服务的接口

/**
 * Created by apple on 2019/3/31.
 */
public class HelloServiceImpl implements HelloService {
    @Override
    public void sayHello(String name) {
        System.out.println("name = " + name);
    }

    @Override
    public String drive(String name) {
        System.out.println("name = " + name);
        return "134";
    }
}

第三步:远程服务需要被别人知道,那么就需要将服务导出让别人知道服务的定义,服务导出后那么就需要在调用的时候进行服务的引用拿到具体的实现类,本来这两部是可以分开的,这里我们把它合并在了一个类中

/**
 * Created by apple on 2019/3/31.
 */
public interface RPCInvocation {


    void export(Object target, int port);


    <T> T reference(Class<T> clazz, String host, int port);

}
import javax.validation.constraints.NotNull;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.ServerSocket;
import java.net.Socket;

/**
 * Created by apple on 2019/3/31.
 */
public class RPCInvocationImpl implements RPCInvocation {

    private static RPCInvocationImpl instance;

    private RPCInvocationImpl() {
    }


    public static RPCInvocationImpl getInstance() {
        synchronized (RPCInvocationImpl.class){
            if(null == instance){
                synchronized (RPCInvocationImpl.class){
                    instance = new RPCInvocationImpl();
                }
            }
        }
        return instance;
    }

    @Override
    public void export(@NotNull final Object target, @NotNull int port) {
        try {
            ServerSocket server = new ServerSocket(port);
            while(true){
                final Socket socket = server.accept();
                new Thread(new Runnable() {
                    @Override
                    public void run() {
                        ObjectInputStream inputStream = null;
                        ObjectOutputStream outputStream = null;
                        try {
                            inputStream = new ObjectInputStream(socket.getInputStream());
                            String methodName = inputStream.readUTF();
                            Class<?>[] parmeterType = (Class<?>[]) inputStream.readObject();
                            Object[] args = (Object[]) inputStream.readObject();
                            Method method = target.getClass().getDeclaredMethod(methodName,parmeterType);
                            Object result = method.invoke(target,args);
                            outputStream = new ObjectOutputStream(socket.getOutputStream());
                            outputStream.writeObject(result);
                        } catch (IOException e) {
                            e.printStackTrace();
                        } catch (ClassNotFoundException e) {
                            e.printStackTrace();
                        } catch (IllegalAccessException e) {
                            e.printStackTrace();
                        } catch (NoSuchMethodException e) {
                            e.printStackTrace();
                        } catch (InvocationTargetException e) {
                            e.printStackTrace();
                        }finally {
                            try {
                                outputStream.close();
                                inputStream.close();
                                socket.close();
                            } catch (IOException e) {
                                e.printStackTrace();
                            }
                        }
                    }
                }).start();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    @Override
    public <T> T reference(final Class<T> clazz, final String host, final int port) {
        return (T) Proxy.newProxyInstance(clazz.getClassLoader(), new Class<?>[]{clazz}, new InvocationHandler() {
            @Override
            public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
                Socket socket = new Socket(host,port);
                ObjectOutputStream outputStream = new ObjectOutputStream(socket.getOutputStream());
                outputStream.writeUTF(method.getName());
                outputStream.writeObject(method.getParameterTypes());
                outputStream.writeObject(args);

                ObjectInputStream inputStream = new ObjectInputStream(socket.getInputStream());
                return inputStream.readObject();
//                return null;
            }
        });

    }
}

通过代码我们可以看到,服务的导出和调用的核心都是Java的动态代理,在这里服务端和客户端我们采用了ServerSocket,只是为了验证原理,一般RPC框架中都是通过netty或者其他的AIO框架来实现,这里如果有同学感兴趣可以换成netty试试,目前dubbo的底层通信就是封装的netty。
最后我们来验证一下


/**
 * Created by apple on 2019/3/31.
 */
public class RPCDemo {


    public static void main(String[] args){

        HelloService service = new HelloServiceImpl();
        RPCInvocation rpc =  RPCInvocationImpl.getInstance();
        rpc.export(service,1234);

    }
}
/**
 * Created by apple on 2019/3/31.
 */
public class DemoClient {


    public static void main(String[] args){
        RPCInvocation rpc =  RPCInvocationImpl.getInstance();
        HelloService service1 = rpc.reference(HelloService.class,"localhost",1234);
        String se = service1.drive("123");
        System.out.println("se = " + se);
        service1.sayHello("zs");

    }
}

通过运行结果发现可以实现RPC的效果,当然这个只是简单的模型验证,真正的RPC框架包含了: 注册中心,服务治理,序列化,数据通信,负载均衡等等多个组件。注册用心用于对服务远程服务就行注册管理方便客户端知道当前服务提供者有哪些。服务治理:用于对服务进行限流,升降级。序列化组件:主要是对服务调用过程中对传输的数据进行序列化以提高传输过程中的安全性和效率。数据通信组件:主要是打通服务端和客户端之间的桥梁,保持两边的连通。负载均衡组件:是为了保证服务调用的高可用以及稳定性。

转载于:https://www.jianshu.com/p/a184b5f6235f