手写RPC框架
程序员文章站
2022-07-12 16:39:01
...
概念
什么是RPC,remote produce call 远程过程调用,也就是在分布式项目中服务调用方调用远程服务就像调用本地服务一样,透明化调用。实现这个功能的技术目前很多,有Java的rmi、http调用、protobuf、dubbo等。
实现方式
- 服务提供方首先会往注册中心注册本地提供的服务;
- 服务调用方会从服务注册中心获取服务提供方的服务信息;
- 调用方发起一个服务,并获取一个动态代理类来发起请求;
- 调用方序列化请求,并通过获取的服务信息来连接上服务端;
- 服务提供方通过网络连接来获取请求信息;
- 反序列化请求信息,并处理请求信息;
- 服务方把处理完的请求返回给调用方;
技术选型
- 服务注册中心使用的是zookeeper
- 调用方使用jdk动态代理类处理
- 网络请求连接是用socket
- 服务端处理请求会通过线程池启用线程
实现代码
服务方处理请求代码
public class ExecuteService{
public static ThreadPoolExecutor executor = new ThreadPoolExecutor(10, 10, 5, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
public static void exec(Socket socket) {
FutureTask<Object> task = new FutureTask<Object>(new Callable<Object>() {
@Override
public Object call() throws Exception {
OutputStream outputStream= null;
InputStream inputStream=null;
ObjectOutputStream oos =null;
ObjectInputStream ois=null;
try {
inputStream=socket.getInputStream();
ois = new ObjectInputStream(inputStream);
String serviceImpl=ois.readUTF();
String method = (String)ois.readUTF();
Person person=(Person)ois.readObject();
ApplicationContext context=LZLApplicationContextAware.getApplicationContext();
String result = (String)context.getBean(serviceImpl).getClass().getMethod(method, Person.class).invoke(context.getBean(serviceImpl).getClass().newInstance(), person);
outputStream=socket.getOutputStream();
oos =new ObjectOutputStream(outputStream);
oos.writeObject(result);
oos.flush();
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
ois.close();
oos.close();
outputStream.close();
inputStream.close();
socket.close();
} catch (Exception e2) {
e2.printStackTrace();
}
}
return "ok";
}
});
executor.execute(task);
}
}
调用方发起调用代码
public class LZLSocket{
public static String connect(String ip, String port, String serviceImpl, Method method, Person person) {
String msg = "调用失败";
Socket socket = null;
OutputStream outputStream = null;
InputStream inputStream = null;
ObjectOutputStream oos = null;
ObjectInputStream ois = null;
try {
int portId = Integer.valueOf(port);
socket = new Socket(ip, portId);
outputStream = socket.getOutputStream();
inputStream = socket.getInputStream();
oos = new ObjectOutputStream(outputStream);
oos.writeUTF(serviceImpl);
oos.writeUTF(method.getName());
oos.writeObject(person);
oos.flush();
System.out.println("正在调用远程服务。。。。。。。。");
ois = new ObjectInputStream(inputStream);
msg = (String)ois.readObject();
System.out.println(msg);
return msg;
} catch (Exception e) {
e.printStackTrace();
} finally {
try {
oos.close();
ois.close();
outputStream.close();
inputStream.close();
socket.close();
} catch (IOException e)
{ e.printStackTrace(); }
}
return msg;
}
}
调用方代理实现
public class RPCProxy implements InvocationHandler
{
public RPCProxy()
{
}
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable
{
Person person = (Person)args[0];
byte[] data = SubscribeService.subscribe();
String msg = new String(data);
String[] message = msg.split(":");
String ip = message[0];
String port = message[1];
String service = message[2];//是用来判断服务的,当前就一个节点一个服务,就不用判断
String serviceImpl = message[3];
/*
* 这里还缺一个负载策略,待实现
*/
String result = LZLSocket.connect(ip, port, serviceImpl, method, person);
System.out.println("result="+result);
return result;
}
public QueryPersonnelInfo getProxyInstance() {
return (QueryPersonnelInfo)Proxy.newProxyInstance(QueryPersonnelInfo.class.getClassLoader(), new Class<?>[]{QueryPersonnelInfo.class}, this);
}
}
另外说明
服务方的注册是通过spring容器在启动时的后置处理器beanfactorypostprocess或者springboot启动时的CommandLineRunner的实现类来第一时间注册服务信息,而服务端的controller和filter是利用springmvc调用服务端功能测试使用的,服务端断开了与zk的连接过滤器就会阻止外部去调用服务,实现下线功能。
具体的实现代码可以去我的GitHub查看:https://github.com/liuzongliang0202/RPCProject
上一篇: priority_queue用法总结
下一篇: 手写一个简单的RPC