欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

spring中实现远程监听 javaspringeventlistener远程监听 

程序员文章站 2022-04-22 08:32:36
...
   近期项目需要集群,缓存集群是自己实现的,需要在缓存发生变动后,需要发生消息给各个节点更新缓存。所以就做了个远程监听功能。远程监听用rmi协议,事件发布前都动态查询出活动的节点,事件发布后会被活动节点上的listener监听到。上代码
1.定义event和listener
public class  BaseEvent  extends EventObject {


	private static final long serialVersionUID = 1L;
	
	
	/** System time when the event happened */
	private final long timestamp;
	

	public BaseEvent(Object source) {
		super(source);
		this.timestamp = System.currentTimeMillis();
		
	}
	
	/**
	 * Return the system time in milliseconds when the event happened.
	 */
	public final long getTimestamp() {
		return this.timestamp;
	}
}

public interface EventLisenter<T extends BaseEvent>{

	/**
	 * 事件处理
	 * @param baseEvent
	 */
	void onEvent(T t);
}

2、定义远程监听配置
public  class RemoteLisenter{
	
	private  Class  eventClass;
	
	private Class serviceInterface;
	
	private String serviceName;
	
	private String  registryPort;

	public Class getServiceInterface() {
		return serviceInterface;
	}

	public void setServiceInterface(Class serviceInterface) {
		this.serviceInterface = serviceInterface;
	}

	public String getServiceName() {
		return serviceName;
	}

	public void setServiceName(String serviceName) {
		this.serviceName = serviceName;
	}


监听管理类,用于事件注册,更新远程监听,发布事件
public class RemoteLisenterConfig {

	private List<RemoteLisenter> remoteLisenters =new ArrayList<RemoteLisenter>();
	
	
	public List<RemoteLisenter> getRemoteLisenters() {
		return remoteLisenters;
	}


	public void setRemoteLisenters(List<RemoteLisenter> remoteLisenters) {
		this.remoteLisenters = remoteLisenters;
	}

}


@Service
public class ListennerManagement {

	
	protected Logger logger = Logger.getLogger(getClass());

	
	@Autowired
	private HeartbeatService heartbeatService;
	
	
	@Autowired
	RemoteLisenterConfig remoteLisenterConfig;
	
	
	
	

	/**
	 * 本地监听
	 */
	private  Map<String, List<EventLisenter>> localListeners = new LinkedHashMap<String, List<EventLisenter>>();
	
	/**
	 * 远程监听
	 */
	private  Map<String, List<EventLisenter>> remoteListeners = new LinkedHashMap<String, List<EventLisenter>>();

	

	/**
	 * 扫瞄所有bean,进行队本地事件进行事件监听
	 * 
	 * @throws Exception
	 */
	@SuppressWarnings("rawtypes")
	public  void registryListener(ApplicationContext  ctx) throws Exception {
		// 取得容器中所有监听
		Map<String, EventLisenter> beans = ctx
				.getBeansOfType(EventLisenter.class);
		if (beans == null || beans.size() == 0) {
			return;
		}
		Collection<EventLisenter> list = beans.values();
		for (EventLisenter listener : list) {
			Class listenercls = AopTargetUtils.getTarget(listener).getClass();
			Class eventCls = GenericTypeResolver.resolveTypeArgument(
					listenercls, EventLisenter.class);

			try {
				if (localListeners.containsKey(eventCls.getName())) {
					localListeners.get(eventCls.getName()).add(listener);
				} else {
					List<EventLisenter> l = new ArrayList<EventLisenter>();
					l.add(listener);
					localListeners.put(eventCls.getName(), l);
				}
			} catch (Exception e) {
				throw new Exception("初始化事件监听器时出错:", e);
			}
		}

	}

	
	 private void refreshRemoteListeners(){
		//查询出集群服务器的IP(此处从数据库配置中查询)
		List<String> ipList=heartbeatService.getAliveHostsExcludeSelf();
		List<RemoteLisenter>  RemoteLisenterList=remoteLisenterConfig.getRemoteLisenters();
		remoteListeners = new LinkedHashMap<String, List<EventLisenter>>();
		if(RemoteLisenterList!=null){
			for (RemoteLisenter remoteLisenter : RemoteLisenterList) {
				
				String eventClsName=remoteLisenter.getEventClass().getName();
				Class listenerCls=remoteLisenter.getServiceInterface();
				String port=remoteLisenter.getRegistryPort();
				String serviceName=remoteLisenter.getServiceName();
				if(ipList!=null){
					for (String ip : ipList) {
						EhCacheService ehCacheService=null;
						EventLisenter listener = buildRemotListener(listenerCls, port,serviceName, ip);
						
						if(listener!=null){
							if (remoteListeners.containsKey(eventClsName)) {
								remoteListeners.get(eventClsName).add(listener);
							} else {
								List<EventLisenter> l = new ArrayList<EventLisenter>();
								l.add(listener);
								remoteListeners.put(eventClsName, l);
							}
						}
					}
				}
			
		}
			
			
			
		}
	}


	private EventLisenter buildRemotListener(Class listenerCls, String port,
			String serviceName, String ip) {
		
		try {
			RmiProxyFactoryBean rmiProxyFactoryBean = new RmiProxyFactoryBean();
			rmiProxyFactoryBean.setServiceInterface(listenerCls); 
			rmiProxyFactoryBean.setServiceUrl("rmi://"+ip+":"+port+"/"+serviceName);
			rmiProxyFactoryBean.afterPropertiesSet();
			
			if (rmiProxyFactoryBean.getObject() instanceof EventLisenter) { 
				EventLisenter  listener=(EventLisenter)rmiProxyFactoryBean.getObject();
				return listener;
			}else{
				return null;
			}
		} catch (Exception e) {
			logger.error("获取远程监听bean错误[listenerClass="+listenerCls+";port="+port+";ip="+ip+";serviceName="+serviceName+"]", e);
			return null;
		}
		
		
		

		
	}
	
	
	/**
	 * 发布事件
	 * 
	 * @throws Exception
	 */
	@SuppressWarnings("rawtypes")
	public  void publishEvent(BaseEvent event)  {
		//本地监控
		List<EventLisenter> localList = localListeners.get(event.getClass().getName());
		if (localList != null) {
			for (EventLisenter listener : localList) {
				try {
					listener.onEvent(event);
				} catch (Exception e) {
					logger.error(e.getMessage());
				}
			}
		}
		
		//远程监控
		Class eventClass=event.getClass();
		if(needRemoteListenre(eventClass)){
			//刷新远程监听者
			refreshRemoteListeners();
			
			List<EventLisenter> remoteList = remoteListeners.get(event.getClass().getName());
			if (remoteList != null) {
				for (EventLisenter listener : remoteList) {
					try {
						listener.onEvent(event);
					} catch (Exception e) {
						logger.error(e.getMessage());
					}
				}
			}
		}
	}


    /**
     * 判断本事件是否需要远程监听
     * @param eventClass
     * @return
     */
	private boolean needRemoteListenre(Class eventClass) {
		
		List<RemoteLisenter>  RemoteLisenterList=remoteLisenterConfig.getRemoteLisenters();
		if(RemoteLisenterList!=null){
			for (RemoteLisenter remoteLisenter : RemoteLisenterList) {			
			     Class eventCls=remoteLisenter.getEventClass();
			     if(eventCls.equals(eventCls))
			    	 return true;
			}
		}
		return false;	
			
	}


	public Map<String, List<EventLisenter>> getLocalListeners() {
		return localListeners;
	}


	public Map<String, List<EventLisenter>> getRemoteListeners() {
		return remoteListeners;
	}
	


配置文件
<!-- 配置远程监听配置对象 -->
	<bean id="remoteLisenterConfig"  class="com.ejintai.cbs_policy_registry.base.event.RemoteLisenterConfig">
	  <property name="remoteLisenters">
	     <list>
	     <bean id="remoteLisenter1"  class="com.ejintai.cbs_policy_registry.base.event.RemoteLisenter">
	          <property name="eventClass"      value="com.ejintai.cbs_policy_registry.base.event.biz.EhCacheUpdateEvent" />  
	          <property name="serviceName"         value="ehCacheUpdateEventListener" />  
              <property name="serviceInterface"    value="com.ejintai.cbs_policy_registry.base.event.EventLisenter"/>  
      		  <property name="registryPort"        value="${rmi.port}"/>  
	      </bean>
	     </list>
	  </property>
	</bean>

 <bean id="remoteEhCacheUpdateEventListener" class="org.springframework.remoting.rmi.RmiServiceExporter" >  
        <property name="serviceName"         value="ehCacheUpdateEventListener" />  
        <property name="service"             ref="ehCacheUpdateEventListener"/>  
        <property name="serviceInterface"    value="com.ejintai.cbs_policy_registry.base.event.EventLisenter"/>  
        <property name="registryPort"        value="${rmi.port}"/>  
    </bean>