欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

(六) Nepxion-Thunder分布式RPC集成框架 - 点对点模型

程序员文章站 2022-04-20 23:10:07
...

Nepxion-Thunder(QQ 群 471164539)发布在https://github.com/Nepxion/

 

点对点模型实现,主要是用于Netty & Hessian。结构图如下:
(六) Nepxion-Thunder分布式RPC集成框架 - 点对点模型
            
    
    博客分类: 开源框架平台选型 NettyHessianKafkaRedisDocker 
点击查看大图
 

  •  1 Netty框架
     1.1 工作原理
     1)Spring扫描线程扫描到一个Service节点后,就开启一个服务端
    Netty NioSocketChannel通道(绑定服务端口),并开启Redis的对该Service的订阅
     2)Spring扫描线程扫描到一个Reference节点后,就开启一个调用端Netty NioSocketChannel通道(根据ApplicationEntity里的host和hort,与对应服务端进行长连接)
     3)当调用端通过Spring Aop进行同步/异步远程调用时,先从缓存获取ChannelFuture对象,把
    ProtocolRequest请求通过ChannelFuture.writeAndFlush发送到服务端,服务端处理完毕后,通过ChannelHandlerContext.writeAndFlush把ProtocolResponse返回 到调用端,调用端收到响应后,如果是异步调用Callback方式完成调用,如果是同步通过CyclicBarrier的线程等待返回值,最后完成调用
     4)当调用端通过Spring Aop进行广播远程调用时,把ProtocolRequest发布到Redis,服务端订阅监听该ProtocolRequest后,进行处理,不返回结果

     1.2 类结构
    (六) Nepxion-Thunder分布式RPC集成框架 - 点对点模型
            
    
    博客分类: 开源框架平台选型 NettyHessianKafkaRedisDocker 
     1)NettyServerExecutor.java - 继承AbstractServerExecutor.java,实现服务端对
    Netty NioSocketChannel的阻塞式初始化,通过通过ChannelFuture和本地端口进行绑定提供通道服务,以及数据通道关闭的监听,
    开启Redis的订阅
     
    2)NettyServerHandler.java - 继承SimpleChannelInboundHandler.java,通过同步队列的线程池实现服务端对Netty通道的数据读取和写入,以及数据通道关闭的异常捕捉

     3)NettyClientExecutor.java - 继承AbstractClientExecutor.java,实现调用端对Netty NioSocketChannel的阻塞式初始化,通过ChannelFuture和服务端的通道建立长连接,并实现对服务上下线的处理(上线时,把
    ChannelFuture加入缓存;下线时,把ChannelFuture移出缓存)
     4)NettyClientHandler.java - 继承SimpleChannelInboundHandler.java,
    通过同步队列的线程池实现服务端对Netty通道的数据读取和写入,以及数据通道关闭的异常捕捉,心跳机制的实现
     5)NettyClientInterceptor.java - 继承AbstractClientInterceptor.java,实现如下调用方式:
          异步调用:调用负载均衡器获得可连接的ApplicationEntity(服务),如果获得的Application所在的服务器,恰巧下线了(服务下线到注册中心下线,一般会有一段时间间隔),那么采用重复调用继续做负载均衡去获取可连服务(
    ChannelFuture)。采用Callback异步返回
          同步调用:同异步调用,采用返回值返回
          广播调用:支持两种方式,默认是Redis的发布。如果Redis服务没起,采用轮循发送,就是往注册中心所有注册的相关服务循环发送一遍广播消息

    6)NettyHeartbeat.java - 心跳实现
    7)NettyObjectDecoder.java - 对序列化对象的解码
    8)NettyObjectEncoder.java
    - 对序列化对象的编码
    9)NettyException.java - Netty异常类

  • 2 Hessian
    2.1 工作原理
    1)Hessian服务器在启动的时候,会把服务端配置的XML解析成hessian-servlet.xml,根据web.xml配置的上下文启动Hessian服务
    例如服务端配置的XML叫hessian-server-context.xml,内容如下:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
    	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    	xmlns:context="http://www.springframework.org/schema/context"
    	xmlns:aop="http://www.springframework.org/schema/aop" 
    	xmlns:tx="http://www.springframework.org/schema/tx"
    	xmlns:mvc="http://www.springframework.org/schema/mvc"
    	xmlns:thunder="http://www.nepxion.com/schema/thunder"
    	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd 
               http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.0.xsd 
               http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-3.0.xsd 
               http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.0.xsd
               http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc-3.0.xsd
               http://www.nepxion.com/schema/thunder http://www.nepxion.com/schema/thunder/thunder-1.0.xsd">
        
        <thunder:application id="application" application="APP-IOS" group="MY_GROUP" cluster="HessianServerCluster" port="8080"/>
        
        <thunder:protocol id="protocol" type="hessian"/>
        
        <thunder:registry id="registry" type="zookeeper" address="localhost:2181" config="remote"/>
        
        <thunder:strategy id="strategy" loadbalance="consistentHash"/>
        
        <thunder:service id="userServiceImpl" interface="com.nepxion.thunder.test.service.UserService" ref="_userServiceImpl"/>
        <bean name="_userServiceImpl" class="com.nepxion.thunder.test.service.UserServiceImpl"/>
        
        <thunder:service id="animalServiceImpl" interface="com.nepxion.thunder.test.service.AnimalService" ref="_animalServiceImpl"/>
        <bean name="_animalServiceImpl" class="com.nepxion.thunder.test.service.AnimalServiceImpl"/>
    </beans>
    动态解析成hessian-servlet.xml,内容如下:
    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans-3.0.xsd">
        <bean name="/com.nepxion.thunder.test.service.AnimalService" class="com.nepxion.thunder.protocol.hessian.HessianAuthServiceExporter">
            <property name="serviceInterface" value="com.nepxion.thunder.test.service.AnimalService"/>
            <property name="service" ref="_animalServiceImpl"/>
        </bean>
        <bean name="_animalServiceImpl" class="com.nepxion.thunder.test.service.AnimalServiceImpl"/>
        <bean name="/com.nepxion.thunder.test.service.UserService" class="com.nepxion.thunder.protocol.hessian.HessianAuthServiceExporter">
            <property name="serviceInterface" value="com.nepxion.thunder.test.service.UserService"/>
            <property name="service" ref="_userServiceImpl"/>
        </bean>
        <bean name="_userServiceImpl" class="com.nepxion.thunder.test.service.UserServiceImpl"/>
    </beans>
        web.xml,内容如下:
    <?xml version="1.0" encoding="UTF-8"?>
    <web-app xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    	xmlns:web="http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
    	xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd"
    	version="2.5">
    
    	<context-param>
    		<param-name>contextConfigLocation</param-name>
    		<param-value>classpath:hessian-server-context.xml</param-value>
    	</context-param>
    
    	<listener>
    		<listener-class>com.nepxion.thunder.framework.context.ThunderContextLoaderListener</listener-class>
    	</listener>
    
    	<!-- Only limited for Hessian -->
    	<servlet>
    		<servlet-name>hessian</servlet-name>
    		<servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
    		<init-param>
    			<param-name>contextConfigLocation</param-name>
    			<param-value>classpath:hessian-servlet.xml</param-value>
    		</init-param>
    		<load-on-startup>1</load-on-startup>
    	</servlet>
    
    	<servlet-mapping>
    		<servlet-name>hessian</servlet-name>
    		<url-pattern>/hessian/*</url-pattern>
    	</servlet-mapping>
    </web-app>
    2)Spring扫描线程扫描到一个Service节点后,就开启Redis的对该Service的订阅
    3)Spring扫描线程扫描到一个Reference节点后,由HessianAuthProxyFactory创建Proxy对象,并进入ProxyMap缓存
    4)当调用端通过Spring Aop进行同步/异步远程调用时,先从缓存获取
    Proxy对象,把ProtocolRequest请求分解,通过method.invoke(proxy, arguments)调用Hessian服务器,服务端同步处理完毕后,返回值
    5)当调用端通过Spring Aop进行广播远程调用时,把ProtocolRequest发布到Redis,服务端订阅监听该ProtocolRequest后,进行处理,不返回结果

    2.2 类结构
    (六) Nepxion-Thunder分布式RPC集成框架 - 点对点模型
            
    
    博客分类: 开源框架平台选型 NettyHessianKafkaRedisDocker 
     1)HessianServerExecutor.java -
    继承AbstractServerExecutor.java,开启Redis的订阅
     2)HessianClientExecutor.java - 继承AbstractClientExecutor.java,由HessianAuthProxyFactory创建Proxy对象,并进入ProxyMap缓存。并实现对服务上下线的处理(上线时,把Proxy加入ProxyMap缓存;下线时,把Proxy移出ProxyMap缓存)
     3)HessianClientInterceptor.java - 继承AbstractClientInterceptor.java,实现如下调用方式:
          异步调用:调用负载均衡器获得可连接的ApplicationEntity(服务),如果获得的Application所在的服务器,恰巧下线了(服务下线到注册中心下线,一般会有一段时间间隔),那么采用重复调用继续做负载均衡去获取可连服务(Proxy)。采用线程池Callback异步返回
          同步调用:同异步调用,采用返回值返回。Hessian默认调用方式
          广播调用:支持两种方式,默认是Redis的发布。如果Redis服务没起,采用轮循发送,就是往注册中心所有注册的相关服务循环发送一遍广播消息
          重复调用:从上一次的调用返回异常是否为ConnectException来判断是否要发起重复调用,同时一旦出现ConnectException,调用端把Proxy对象从ProxyMap中移除
    4)HessianAuthProxyFactory.java - 继承HessianProxyFactory.java,覆盖createHessianConnectionFactory方法,拦截创建HessianConnection的方法,把调用端的密钥和版本信息通过Header方式发到服务端
    5)HessianAuthServiceExporter.java - 继承HessianServiceExporter.java,实现ApplicationContextAware.java,覆盖handleRequest方法,在服务端进入调用之前,进行限流,密钥,版本控制
    6)HessianServletGenerator.java - 动态创建
    hessian-servlet.xml
    7)HessianUrlUtil.java - Hessian Url工具类
    8)HessianUtil.java - Hessian工具类

  • 3 Redis
    3.1 工作原理
    由于Netty和Hessian本身不带广播(发布/订阅)功能,所以采用Redis作为辅助插件来实现。如果Redis服务不可用,那Netty和Hessian会采用轮循式方式进行广播发送
    1)服务端和调用端读取配置文件,初始化RedisSentinelPoolFactory里的JedisSentinelPool
    2)服务端从
    JedisSentinelPool获取Jedis对象,启动订阅(Subscribe)阻塞式线程,一旦连接到Redis的连接断掉,阻塞式线程结束,并发起重连,再次进入阻塞时线程,直到连接成功
    3)调用端
    JedisSentinelPool获取Jedis对象,通过线程池启动发布(Publish)线程
    4)对于订阅/发布的通道(Channel)名称是 group + "/" + application + "/" + interfaze,防止不同类型的应用,重名接口接入到同一个Redis服务器

    3.2 类结构
    (六) Nepxion-Thunder分布式RPC集成框架 - 点对点模型
            
    
    博客分类: 开源框架平台选型 NettyHessianKafkaRedisDocker 
     哨兵实现,用于发布/订阅
     1)RedisHierachy.java - Redis结构
     2)RedisPublisher.java - 继承
    RedisHierachy.java,实现发布功能
     3)RedisSubscriber.java -
    继承RedisHierachy.java,实现订阅功能
     4)RedisSentinelPoolFactory.java - JedisSentinelPool连接池工厂类
     5)RedisDestinationUtil.java -
    RedisDestination工具类

     集群实现
     6)RedisClusterFactory - Redis集群工厂类
  • (六) Nepxion-Thunder分布式RPC集成框架 - 点对点模型
            
    
    博客分类: 开源框架平台选型 NettyHessianKafkaRedisDocker 
  • 大小: 10.9 KB
  • (六) Nepxion-Thunder分布式RPC集成框架 - 点对点模型
            
    
    博客分类: 开源框架平台选型 NettyHessianKafkaRedisDocker 
  • 大小: 9.4 KB
  • (六) Nepxion-Thunder分布式RPC集成框架 - 点对点模型
            
    
    博客分类: 开源框架平台选型 NettyHessianKafkaRedisDocker 
  • 大小: 8.9 KB
  • (六) Nepxion-Thunder分布式RPC集成框架 - 点对点模型
            
    
    博客分类: 开源框架平台选型 NettyHessianKafkaRedisDocker 
  • 大小: 157.1 KB
  • (六) Nepxion-Thunder分布式RPC集成框架 - 点对点模型
            
    
    博客分类: 开源框架平台选型 NettyHessianKafkaRedisDocker 
  • 大小: 261.2 KB