手写RPC框架
源码地址:https://github.com/ItGeneral/code-framework
微信公众号原文:https://mp.weixin.qq.com/s/z1kIqvlX92oun1Cpm71Tsw
RPC(Remote Procedure Call Protocol)远程过程调用协议,采用客户机/服务器模式。请求程序就是一个客户机,而服务提供程序就是一个服务器。首先,客户机调用进程发送一个有进程参数的调用信息到服务进程,然后等待应答信息。在服务器端,进程保持睡眠状态直到调用信息的到达为止。当一个调用信息到达,服务器获得进程参数,计算结果,发送答复信息,然后等待下一个调用信息,最后,客户端调用进程接收答复信息,获得进程结果,然后调用执行继续进行。使用RPC框架,可以让远程接口调用像调用本地方法一样方便。 下面介绍一个简单的RPC框架,让读者领略RPC框架的原理
RPC服务端
(1)新增注解@RpcExporter,接口上添加该注解,即表示为RPC服务端接口
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RpcExporter {
/**
* RPC服务端处理器名称(接口名称)
* @return
*/
String value();
}
(2)新增接口RpcTestService及其实现RpcTestServiceImpl
public interface RpcTestService {
Boolean testRpcService(String params);
}
@Service
public class RpcTestServiceImpl implements RpcTestService{
private final static Logger LOGGER = getLogger(RpcTestServiceImpl.class);
@Override
@RpcExporter(value = "testRpcService")
public Boolean testRpcService(String params){
LOGGER.info("======testRpcService====success====" + params);
return true;
}
}
RPC客户端
(1)新增注解@RpcClient,接口方法上添加该注解,即表示为RPC客户端接口
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RpcClient {
/**
* 对应的服务端处理器名称(bean名称+接口名称)
* @return
*/
String value();
/**
* rpc 服务端地址
* @return
*/
String rpcExportUrl();
}
(2)添加@RpcInterface注解,接口类上添加此注解,表明为RPC客户端接口类
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RpcInterface {
}
(3)新增RPC客户端接口
@Service
@RpcInterface
public interface RpcTestClient {
@RpcClient(value = "RpcTestServiceImpl.testRpcService", rpcExportUrl = "http://localhost:8080/rpc")
Boolean testClient(String params);
}
初始化RPC框架
(1)添加@EnableRPC注解,开启RPC配置
@Documented
@Configuration
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Import({RpcConfiguration.RpcPackageRegister.class, RpcConfiguration.class})
public @interface EnableRPC {
/**
* RPC 扫描包路径
* @return
*/
String[] basePackages() default {};
}
public class RpcConfiguration {
private static Set<String> basePackages = new LinkedHashSet<>();
/**
* basePackages路径下的rpc客户端接口,设置为动态代理
* @return
*/
@Bean
public RpcRegister rpcRegister(){
return new RpcRegister(basePackages);
}
public static class RpcPackageRegister implements ImportBeanDefinitionRegistrar{
/**
* 解析EnableRPC basePackages参数
* @param metadata
* @param registry
*/
@Override
public void registerBeanDefinitions(AnnotationMetadata metadata, BeanDefinitionRegistry registry) {
Map<String, Object> enableMap = metadata.getAnnotationAttributes(EnableRPC.class.getName());
AnnotationAttributes annotationAttributes = AnnotationAttributes.fromMap(enableMap);
String[] packages = annotationAttributes.getStringArray("basePackages");
for (String basePackage : packages){
String[] tokenize = StringUtils.tokenizeToStringArray(basePackage,
ConfigurableApplicationContext.CONFIG_LOCATION_DELIMITERS);
basePackages.addAll(Arrays.asList(tokenize));
}
if (basePackages.isEmpty()) {
basePackages.add(ClassUtils.getPackageName(metadata.getClassName()));
}
}
}
}
(2)spring bean 初始化前为所有RPC客户端接口设置动态代理,自定义invoke方法实现RPC客户端向RPC服务端请求数据的逻辑
public class RpcRegister implements BeanFactoryPostProcessor {
/**
* rpc扫描包
*/
private Set<String> baseScanPackages;
public RpcRegister(Set<String> baseScanPackages) {
this.baseScanPackages = baseScanPackages;
}
/**
* spring容器在初始化bean之前,允许自定义某些bean
* RPC客户端所有bean设置动态代理,调用时走动态代理的invoke()
* @param beanFactory
* @throws BeansException
*/
@Override
public void postProcessBeanFactory(ConfigurableListableBeanFactory beanFactory) throws BeansException {
DefaultListableBeanFactory factory = (DefaultListableBeanFactory) beanFactory;
for (String basePackage : baseScanPackages){
try {
Set<Class<?>> classSet = scanBackPackage(basePackage);
for (Class<?> clazz : classSet){
if (clazz.isInterface() && clazz.isAnnotationPresent(RpcInterface.class)){
//生成动态代理
Object proxy = RpcClientProxy.getProxy(clazz);
//将bean注入到spring容器中
factory.registerSingleton(clazz.getName(), proxy);
}
}
} catch (Exception e) {
e.printStackTrace();
}
}
}
/**
* 扫描包下面的所有类
* @param basePackage
* @return
*/
private Set<Class<?>> scanBackPackage(String basePackage){
Set<Class<?>> classSet = new HashSet<>();
try{
String pattern = "classpath*:" + ClassUtils.convertClassNameToResourcePath(basePackage) + "/**/*.class";
ResourcePatternResolver resourcePatternResolver = new PathMatchingResourcePatternResolver();
Resource[] resources = resourcePatternResolver.getResources(pattern);
MetadataReaderFactory readerFactory = new CachingMetadataReaderFactory(resourcePatternResolver);
for (Resource resource : resources){
if (resource.isReadable()){
MetadataReader reader = readerFactory.getMetadataReader(resource);
String className = reader.getClassMetadata().getClassName();
classSet.add(Class.forName(className, false, Thread.currentThread().getContextClassLoader()));
}
}
}catch (Exception e){
e.printStackTrace();
}
return classSet;
}
}
(3)在spring bean初始化后,注册RPC服务端和客户端接口,将RPC服务端相关的配置、bean、接口名称等保存到内存中,以便后续解析调用接口使用
/**
* 将rpc客户端和服务端的注解配置信息 保存到RpcFactory中
* @param bean
* @param beanName
*/
public void resolveRpc(Object bean, String beanName){
Method[] methods = ReflectionUtils.getAllDeclaredMethods(bean.getClass());
if (methods == null){
return;
}
for (Method method : methods){
//解析RPC服务端接口注解的配置
RpcExporter rpcExporter = AnnotationUtils.findAnnotation(method, RpcExporter.class);
if (rpcExporter != null){
RpcExporterContext exporterContext = new RpcExporterContext();
exporterContext.setBean(bean);
exporterContext.setMethod(method);
RpcFactory.registerExporter(beanName, method.getName(), exporterContext);
}
//解析RPC客户端接口注解的配置
RpcClient rpcClient = AnnotationUtils.findAnnotation(method, RpcClient.class);
if (rpcClient != null){
RpcClientContext clientContext = new RpcClientContext();
clientContext.setUrl(rpcClient.rpcExportUrl());
clientContext.setMethodName(rpcClient.value());
RpcFactory.registerClient(beanName, method.getName(), clientContext);
}
}
}
(4)添加Rpc拦截器,所有rpc的请求,都通过该拦截器来执行
@Configuration
public class WebConfiguration extends WebMvcConfigurationSupport {
/**
* 配置rpc过滤器
* @return
*/
@Bean
public FilterRegistrationBean rpcFilter() {
FilterRegistrationBean registration = new FilterRegistrationBean();
registration.setFilter(new RpcFilter());
registration.addUrlPatterns("/rpc");
return registration;
}
}
public class RpcFilter implements Filter {
/**
* 1、RPC服务端监听到特定rpc地址时,doFilter解析客户端的bean名称、接口、以及接口参数
* 2、调用相应的bean接口,并得到返回结果写回response
* @param req
* @param resp
* @param chain
* @throws IOException
* @throws ServletException
*/
@Override
public void doFilter(ServletRequest req, ServletResponse resp, FilterChain chain) throws IOException {
HttpServletRequest request = (HttpServletRequest) req;
HttpServletResponse response = (HttpServletResponse) resp;
InputStreamReader inputStreamReader = null;
BufferedReader reader = null;
try{
inputStreamReader = new InputStreamReader(request.getInputStream(), "UTF-8");
reader = new BufferedReader(inputStreamReader);
String requestBody = "";
String temp ;
while ((temp = reader.readLine()) != null) {
requestBody += temp;
}
//获取请求参数
RpcRequest rpcRequest = JSONObject.parseObject(requestBody, RpcRequest.class);
if (StringUtils.isEmpty(rpcRequest.getMethod())){
throw new RuntimeException("rpc method parameter can not be null");
}
//获取当前rpc请求的服务端配置信息:bean和method
RpcExporterContext exporterContext = RpcFactory.getExporterMap(rpcRequest.getMethod());
//通过反射调用rpc服务端具体的接口方法
Object result = exporterContext.getMethod().invoke(((Class) exporterContext.getBean()).newInstance(), rpcRequest.getParams().toArray());
RpcResponse rpcResponse = new RpcResponse();
rpcResponse.setResult(result);
//写回执行结果
response.getWriter().write(JSON.toJSONString(rpcResponse));
}catch (Exception e){
e.printStackTrace();
}finally {
if (reader != null){
reader.close();
}
if (inputStreamReader != null){
inputStreamReader.close();
}
}
}
}
RPC调用链
(1)业务方调用RPC客户端接口,通过动态代理后,进入invoke方法,通过HttpClient或其他方式向RPC服务端发起请求
(2)服务端添加RPC过滤器,所有通过某个特定地址(http://xxx/rpc)来的请求,都通过这个RPC拦截器
(3)RPC拦截器中解析客户端的请求参数,包括bean名称、接口名称、接口参数等
(4)通过解析出来的请求参数,调用RPC服务端相应的接口方法,并获取到返回结果
(5)RPC服务端将请求结果返回给RPC客户端,RPC客户端接受到返回结果后,继续处理业务逻辑
添加Controller类,请求http://localhost:8080/test测试,结果返回true
@RestController
public class RpcController {
@Autowired
private RpcTestClient client;
@RequestMapping(value = "/test")
public Boolean test() throws NoSuchMethodException {
RpcExporterContext exporterContext = new RpcExporterContext();
try{
Class clazz = Class.forName("com.code.framework.rpc.exporter.RpcTestServiceImpl");
Method method = clazz.getDeclaredMethod("testRpcService", String.class);
exporterContext.setBean(clazz);
exporterContext.setMethod(method);
}catch (Exception e){
e.printStackTrace();
}
RpcFactory.registerExporter("RpcTestServiceImpl", "testRpcService", exporterContext);
RpcClientContext clientContext = new RpcClientContext();
//设置rpc服务端请求地址
clientContext.setUrl("http://localhost:8080/rpc");
//设置rpc服务端接口
clientContext.setMethodName("RpcTestServiceImpl.testRpcService");
RpcFactory.registerClient("RpcTestClient", "testClient", clientContext);
//上述代码为手动配置rpc客户端和服务端相应的配置信息,也可以通过扫描包中rpc客户端和服务端接口方法上的注解来获取配置信息,然后将其保存到RpcFactory中,供解析调用时使用
Boolean result = client.testClient("test rpc");
return result;
}
}