欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

RPC 远程过程调用(一)

程序员文章站 2022-06-15 17:12:21
...

 

目的: 多个项目部署在不同服务器上,一个项目通过rpc调用另一个项目的接口服务

一. order-api项目

对外暴露接口,放在仓库中,给服务实现方和服务调用方依赖,最终服务调用方只能看见接口而看不见具体的实现

1. 服务接口OrderService

public interface OrderService {
    String findOrderList();

    String findOrderById();
}

2. 远程连接传递参数RpcRequestDto

@Data
public class RpcRequestDto implements Serializable {

    /**
     * 调用类名
     */
    private String   className;
    /**
     * 方法名称
     */
    private String   methodName;

    /**
     * 请求参数
     */
    private Object[] args;
    /**
     * 参数类型
     */
    private Class[]  type;
}

3. pom文件与order-api项目文件结构

RPC 远程过程调用(一)

二. order-provdier项目

1. 服务实现类OrderServiceImpl

public class OrderServiceImpl implements OrderService {
    @Override
    public String findOrderList() {
        return "OrderServiceImpl findOrderList 这就是 orderList !";
    }

    @Override
    public String findOrderById() {
        return "OrderServiceImpl findOrderById 这就是 order !";
    }
}

2.  rpc代理发布服务端RpcProxyServer

public class RpcProxyServer {

    private ExecutorService executorService = Executors.newFixedThreadPool(5);

    public void publisher(Object service, int port) {
        System.out.println("发布 " + service.getClass().getName() + " 到 " + port + " 端口");

        ServerSocket serverSocket = null;
        try {
            // 创建port端口的服务端
            serverSocket = new ServerSocket(port);

            while (true) {
                // 监听客户端请求
                Socket socket = serverSocket.accept();

                System.out.println("另外一个线程处理,此监听客户端请求的线程就不用阻塞在这里了");
                executorService.execute(new ProcessorHandler(socket, service));
            }
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (null != serverSocket) {
                try {
                    serverSocket.close();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }
    }

}

3. 发布操作类ProcessorHandler

public class ProcessorHandler implements Runnable {

    /**
     * socket连接, 要发布的实例服务
     */
    private Socket socket;
    private Object service;

    public ProcessorHandler(Socket socket, Object service) {
        this.socket = socket;
        this.service = service;
    }

    @Override
    public void run() {
        ObjectInputStream objectInputStream = null;
        ObjectOutputStream objectOutputStream = null;
        try {
            objectInputStream = new ObjectInputStream(socket.getInputStream());

            //  反序列化读取请求数据对象dto
            RpcRequestDto requestDto = (RpcRequestDto) objectInputStream.readObject();
            System.out.println("服务端 ProcessorHandler run requestDto: " + requestDto.toString());

            // 通过反射调用方法:
            // 1. 手动加载class类
            Class clazz = Class.forName(requestDto.getClassName());
            // 2. 根据方法和参数类型 获取方法
            Method method = clazz.getMethod(requestDto.getMethodName(), requestDto.getType());
            // 3. 传入参数, 反射调用发布服务类
            Object result = method.invoke(service, requestDto.getArgs());
            System.out.println("服务端 ProcessorHandler run result: " + result);

            objectOutputStream = new ObjectOutputStream(socket.getOutputStream());

            System.out.println("序列化写入 响应 数据对象dto");
            objectOutputStream.writeObject(result);
            objectOutputStream.flush();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != objectInputStream) {
                    objectInputStream.close();
                }
                if (null != objectOutputStream) {
                    objectOutputStream.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

}

4. 测试发布服务实现类

public class App {
    public static void main(String[] args) {
        System.out.println("Hello World!");

        RpcProxyServer rpcProxyServer = new RpcProxyServer();
        System.out.println("创建服务端代理类");

        OrderService orderService = new OrderServiceImpl();
        System.out.println("创建orderServiceImp实现类");

        rpcProxyServer.publisher(orderService, 1234);
    }
}

5. pom文件,添加依赖order-api


		<!--测试rpc-->
		<dependency>
			<groupId>com.zsw.example</groupId>
			<artifactId>order-api</artifactId>
			<version>1.0-SNAPSHOT</version>
		</dependency>

三. user-service项目(服务调用方)

1. rpc代理服务端

public class RpcProxyClient {

    public <T> T clientProxy(final Class<T> interfaceClass, final String host, final int port) {
        System.out.println("创建 rpc 代理服务端");
        return (T) Proxy.newProxyInstance(interfaceClass.getClassLoader(), new Class<?>[] { interfaceClass },
                new RemoteInvocationHandler(host, port));
    }
}

2. 远程代理实现类RemoteInvocationHandler

public class RemoteInvocationHandler implements InvocationHandler {

    private String host;
    private int    port;

    public RemoteInvocationHandler(String host, int port) {
        this.host = host;
        this.port = port;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) {
        System.out.println("RemoteInvocationHandler 建立远程连接");
        RpcNetTransPort rpcNetTransPort = new RpcNetTransPort(host, port);

        RpcRequestDto requestDto = new RpcRequestDto();
        requestDto.setArgs(args);
        requestDto.setClassName(method.getDeclaringClass().getName());
        requestDto.setMethodName(method.getName());
        requestDto.setType(method.getParameterTypes());

        return rpcNetTransPort.send(requestDto);
    }
}

3. 远程连接发送接收参数RpcNetTransPort

public class RpcNetTransPort {

    private String host;

    private int    port;

    public RpcNetTransPort(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public Object send(RpcRequestDto requestDto) {
        System.out.println("RpcNetTransPort 发送请求");
        ObjectOutputStream objectOutputStream = null;

        ObjectInputStream objectInputStream = null;
        try {
            Socket socket = new Socket(host, port);

            objectOutputStream = new ObjectOutputStream(socket.getOutputStream());

            System.out.println("序列化写入 请求 数据对象dto");
            objectOutputStream.writeObject(requestDto);
            objectOutputStream.flush();

            System.out.println("反序列化读取 服务端响应结果");
            objectInputStream = new ObjectInputStream(socket.getInputStream());
            return objectInputStream.readObject();
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != objectOutputStream) {
                    objectOutputStream.close();
                }
                if (null != objectInputStream) {
                    objectInputStream.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        return "服务端响应失败!";
    }
}

4. 调用测试RpcClientTest

public class RpcClientTest {

    public static void main(String[] args) {
        RpcProxyClient rpcProxyClient = new RpcProxyClient();

        OrderService orderService = rpcProxyClient.clientProxy(OrderService.class, "127.0.0.1", 1234);

        System.out.println(orderService.findOrderById());
        System.out.println(orderService.findOrderList());
    }

}

5. pom文件添加依赖


		<!--测试rpc-->
		<dependency>
			<groupId>com.zsw.example</groupId>
			<artifactId>order-api</artifactId>
			<version>1.0-SNAPSHOT</version>
		</dependency>

四. 测试

1. 先开启项目二order-provider的App类的main方法发布orderService的实现类到1234端口监听请求,线程阻塞:

RPC 远程过程调用(一)

2. 再开启项目三user-service的RpcClientTest类的main方法,穿建orderService的代理类实现远程连接进行访问:

    2.1 user-service日志:

RPC 远程过程调用(一)

    2.2 order-provider日志:

RPC 远程过程调用(一)