欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

手写RPC框架

程序员文章站 2022-06-14 09:27:10
...

 

源码地址:https://github.com/ItGeneral/code-framework

微信公众号原文:https://mp.weixin.qq.com/s/z1kIqvlX92oun1Cpm71Tsw

 

RPC(Remote Procedure Call Protocol)远程过程调用协议,采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息的到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。使用RPC框架,可以让远程接口调用像调用本地方法一样方便。 下面介绍一个简单的RPC框架,让读者领略RPC框架的原理

RPC服务端

(1)新增注解@RpcExporter,接口上添加该注解,即表示为RPC服务端接口

  1. @Target(ElementType.METHOD)
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Documented
  4. public @interface RpcExporter {
  5. /**
  6. * RPC服务端处理器名称(接口名称)
  7. * @return
  8. */
  9. String value();
  10. }

(2)新增接口RpcTestService及其实现RpcTestServiceImpl

  1. public interface RpcTestService {
  2. Boolean testRpcService(String params);
  3. }
  4. @Service
  5. public class RpcTestServiceImpl implements RpcTestService{
  6. private final static Logger LOGGER = getLogger(RpcTestServiceImpl.class);
  7. @Override
  8. @RpcExporter(value = "testRpcService")
  9. public Boolean testRpcService(String params){
  10. LOGGER.info("======testRpcService====success====" + params);
  11. return true;
  12. }
  13. }

RPC客户端

(1)新增注解@RpcClient,接口方法上添加该注解,即表示为RPC客户端接口

  1. @Target(ElementType.METHOD)
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Documented
  4. public @interface RpcClient {
  5. /**
  6. * 对应的服务端处理器名称(bean名称+接口名称)
  7. * @return
  8. */
  9. String value();
  10. /**
  11. * rpc 服务端地址
  12. * @return
  13. */
  14. String rpcExportUrl();
  15. }

(2)添加@RpcInterface注解,接口类上添加此注解,表明为RPC客户端接口类

  1. @Target(ElementType.TYPE)
  2. @Retention(RetentionPolicy.RUNTIME)
  3. @Documented
  4. public @interface RpcInterface {
  5. }

(3)新增RPC客户端接口

  1. @Service
  2. @RpcInterface
  3. public interface RpcTestClient {
  4. @RpcClient(value = "RpcTestServiceImpl.testRpcService", rpcExportUrl = "http://localhost:8080/rpc")
  5. Boolean testClient(String params);
  6. }

初始化RPC框架

(1)添加@EnableRPC注解,开启RPC配置

  1. @Documented
  2. @Configuration
  3. @Target(ElementType.TYPE)
  4. @Retention(RetentionPolicy.RUNTIME)
  5. @Import({RpcConfiguration.RpcPackageRegister.class, RpcConfiguration.class})
  6. public @interface EnableRPC {
  7. /**
  8. * RPC 扫描包路径
  9. * @return
  10. */
  11. String[] basePackages() default {};
  12. }
  13. public class RpcConfiguration {
  14. private static Set<String> basePackages = new LinkedHashSet<>();
  15. /**
  16. * basePackages路径下的rpc客户端接口,设置为动态代理
  17. * @return
  18. */
  19. @Bean
  20. public RpcRegister rpcRegister(){
  21. return new RpcRegister(basePackages);
  22. }
  23. public static class RpcPackageRegister implements ImportBeanDefinitionRegistrar{
  24. /**
  25. * 解析EnableRPC basePackages参数
  26. * @param metadata
  27. * @param registry
  28. */
  29. @Override
  30. public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
  31. Map<String, Object> enableMap = metadata.getAnnotationAttributes(EnableRPC.class.getName());
  32. AnnotationAttributes annotationAttributes = AnnotationAttributes.fromMap(enableMap);
  33. String[] packages = annotationAttributes.getStringArray("basePackages");
  34. for (String basePackage : packages){
  35. String[] tokenize = StringUtils.tokenizeToStringArray(basePackage,
  36. ConfigurableApplicationContext.CONFIG_LOCATION_DELIMITERS);
  37. basePackages.addAll(Arrays.asList(tokenize));
  38. }
  39. if (basePackages.isEmpty()) {
  40. basePackages.add(ClassUtils.getPackageName(metadata.getClassName()));
  41. }
  42. }
  43. }
  44. }

(2)spring bean 初始化前为所有RPC客户端接口设置动态代理,自定义invoke方法实现RPC客户端向RPC服务端请求数据的逻辑

  1. public class RpcRegister implements BeanFactoryPostProcessor {
  2. /**
  3. * rpc扫描包
  4. */
  5. private Set<String> baseScanPackages;
  6. public RpcRegister(Set<String> baseScanPackages) {
  7. this.baseScanPackages = baseScanPackages;
  8. }
  9. /**
  10. * spring容器在初始化bean之前,允许自定义某些bean
  11. * RPC客户端所有bean设置动态代理,调用时走动态代理的invoke()
  12. * @param beanFactory
  13. * @throws BeansException
  14. */
  15. @Override
  16. public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
  17. DefaultListableBeanFactory factory = (DefaultListableBeanFactory) beanFactory;
  18. for (String basePackage : baseScanPackages){
  19. try {
  20. Set<Class<?>> classSet = scanBackPackage(basePackage);
  21. for (Class<?> clazz : classSet){
  22. if (clazz.isInterface() && clazz.isAnnotationPresent(RpcInterface.class)){
  23. //生成动态代理
  24. Object proxy = RpcClientProxy.getProxy(clazz);
  25. //将bean注入到spring容器中
  26. factory.registerSingleton(clazz.getName(), proxy);
  27. }
  28. }
  29. } catch (Exception e) {
  30. e.printStackTrace();
  31. }
  32. }
  33. }
  34. /**
  35. * 扫描包下面的所有类
  36. * @param basePackage
  37. * @return
  38. */
  39. private Set<Class<?>> scanBackPackage(String basePackage){
  40. Set<Class<?>> classSet = new HashSet<>();
  41. try{
  42. String pattern = "classpath*:" + ClassUtils.convertClassNameToResourcePath(basePackage) + "/**/*.class";
  43. ResourcePatternResolver resourcePatternResolver = new PathMatchingResourcePatternResolver();
  44. Resource[] resources = resourcePatternResolver.getResources(pattern);
  45. MetadataReaderFactory readerFactory = new CachingMetadataReaderFactory(resourcePatternResolver);
  46. for (Resource resource : resources){
  47. if (resource.isReadable()){
  48. MetadataReader reader = readerFactory.getMetadataReader(resource);
  49. String className = reader.getClassMetadata().getClassName();
  50. classSet.add(Class.forName(className, false, Thread.currentThread().getContextClassLoader()));
  51. }
  52. }
  53. }catch (Exception e){
  54. e.printStackTrace();
  55. }
  56. return classSet;
  57. }
  58. }

(3)在spring bean初始化后,注册RPC服务端和客户端接口,将RPC服务端相关的配置、bean、接口名称等保存到内存中,以便后续解析调用接口使用

  1. /**
  2. * 将rpc客户端和服务端的注解配置信息 保存到RpcFactory中
  3. * @param bean
  4. * @param beanName
  5. */
  6. public void resolveRpc(Object bean, String beanName){
  7. Method[] methods = ReflectionUtils.getAllDeclaredMethods(bean.getClass());
  8. if (methods == null){
  9. return;
  10. }
  11. for (Method method : methods){
  12. //解析RPC服务端接口注解的配置
  13. RpcExporter rpcExporter = AnnotationUtils.findAnnotation(method, RpcExporter.class);
  14. if (rpcExporter != null){
  15. RpcExporterContext exporterContext = new RpcExporterContext();
  16. exporterContext.setBean(bean);
  17. exporterContext.setMethod(method);
  18. RpcFactory.registerExporter(beanName, method.getName(), exporterContext);
  19. }
  20. //解析RPC客户端接口注解的配置
  21. RpcClient rpcClient = AnnotationUtils.findAnnotation(method, RpcClient.class);
  22. if (rpcClient != null){
  23. RpcClientContext clientContext = new RpcClientContext();
  24. clientContext.setUrl(rpcClient.rpcExportUrl());
  25. clientContext.setMethodName(rpcClient.value());
  26. RpcFactory.registerClient(beanName, method.getName(), clientContext);
  27. }
  28. }
  29. }

(4)添加Rpc拦截器,所有rpc的请求,都通过该拦截器来执行

  1. @Configuration
  2. public class WebConfiguration extends WebMvcConfigurationSupport {
  3. /**
  4. * 配置rpc过滤器
  5. * @return
  6. */
  7. @Bean
  8. public FilterRegistrationBean rpcFilter() {
  9. FilterRegistrationBean registration = new FilterRegistrationBean();
  10. registration.setFilter(new RpcFilter());
  11. registration.addUrlPatterns("/rpc");
  12. return registration;
  13. }
  14. }
  15. public class RpcFilter implements Filter {
  16. /**
  17. * 1、RPC服务端监听到特定rpc地址时,doFilter解析客户端的bean名称、接口、以及接口参数
  18. * 2、调用相应的bean接口,并得到返回结果写回response
  19. * @param req
  20. * @param resp
  21. * @param chain
  22. * @throws IOException
  23. * @throws ServletException
  24. */
  25. @Override
  26. public void doFilter(ServletRequest req, ServletResponse resp, FilterChain chain) throws IOException {
  27. HttpServletRequest request = (HttpServletRequest) req;
  28. HttpServletResponse response = (HttpServletResponse) resp;
  29. InputStreamReader inputStreamReader = null;
  30. BufferedReader reader = null;
  31. try{
  32. inputStreamReader = new InputStreamReader(request.getInputStream(), "UTF-8");
  33. reader = new BufferedReader(inputStreamReader);
  34. String requestBody = "";
  35. String temp ;
  36. while ((temp = reader.readLine()) != null) {
  37. requestBody += temp;
  38. }
  39. //获取请求参数
  40. RpcRequest rpcRequest = JSONObject.parseObject(requestBody, RpcRequest.class);
  41. if (StringUtils.isEmpty(rpcRequest.getMethod())){
  42. throw new RuntimeException("rpc method parameter can not be null");
  43. }
  44. //获取当前rpc请求的服务端配置信息:bean和method
  45. RpcExporterContext exporterContext = RpcFactory.getExporterMap(rpcRequest.getMethod());
  46. //通过反射调用rpc服务端具体的接口方法
  47. Object result = exporterContext.getMethod().invoke(((Class) exporterContext.getBean()).newInstance(), rpcRequest.getParams().toArray());
  48. RpcResponse rpcResponse = new RpcResponse();
  49. rpcResponse.setResult(result);
  50. //写回执行结果
  51. response.getWriter().write(JSON.toJSONString(rpcResponse));
  52. }catch (Exception e){
  53. e.printStackTrace();
  54. }finally {
  55. if (reader != null){
  56. reader.close();
  57. }
  58. if (inputStreamReader != null){
  59. inputStreamReader.close();
  60. }
  61. }
  62. }
  63. }

RPC调用链

(1)业务方调用RPC客户端接口,通过动态代理后,进入invoke方法,通过HttpClient或其他方式向RPC服务端发起请求

(2)服务端添加RPC过滤器,所有通过某个特定地址(http://xxx/rpc)来的请求,都通过这个RPC拦截器

(3)RPC拦截器中解析客户端的请求参数,包括bean名称、接口名称、接口参数等

(4)通过解析出来的请求参数,调用RPC服务端相应的接口方法,并获取到返回结果

(5)RPC服务端将请求结果返回给RPC客户端,RPC客户端接受到返回结果后,继续处理业务逻辑

添加Controller类,请求http://localhost:8080/test测试,结果返回true

  1. @RestController
  2. public class RpcController {
  3. @Autowired
  4. private RpcTestClient client;
  5. @RequestMapping(value = "/test")
  6. public Boolean test() throws NoSuchMethodException {
  7. RpcExporterContext exporterContext = new RpcExporterContext();
  8. try{
  9. Class clazz = Class.forName("com.code.framework.rpc.exporter.RpcTestServiceImpl");
  10. Method method = clazz.getDeclaredMethod("testRpcService", String.class);
  11. exporterContext.setBean(clazz);
  12. exporterContext.setMethod(method);
  13. }catch (Exception e){
  14. e.printStackTrace();
  15. }
  16. RpcFactory.registerExporter("RpcTestServiceImpl", "testRpcService", exporterContext);
  17. RpcClientContext clientContext = new RpcClientContext();
  18. //设置rpc服务端请求地址
  19. clientContext.setUrl("http://localhost:8080/rpc");
  20. //设置rpc服务端接口
  21. clientContext.setMethodName("RpcTestServiceImpl.testRpcService");
  22. RpcFactory.registerClient("RpcTestClient", "testClient", clientContext);
  23. //上述代码为手动配置rpc客户端和服务端相应的配置信息,也可以通过扫描包中rpc客户端和服务端接口方法上的注解来获取配置信息,然后将其保存到RpcFactory中,供解析调用时使用
  24. Boolean result = client.testClient("test rpc");
  25. return result;
  26. }
  27. }

 

相关标签: rpc