spring中实现远程监听 javaspringeventlistener远程监听
程序员文章站
2022-04-22 08:32:36
...
近期项目需要集群,缓存集群是自己实现的,需要在缓存发生变动后,需要发生消息给各个节点更新缓存。所以就做了个远程监听功能。远程监听用rmi协议,事件发布前都动态查询出活动的节点,事件发布后会被活动节点上的listener监听到。上代码
1.定义event和listener
2、定义远程监听配置
监听管理类,用于事件注册,更新远程监听,发布事件
配置文件
1.定义event和listener
public class BaseEvent extends EventObject { private static final long serialVersionUID = 1L; /** System time when the event happened */ private final long timestamp; public BaseEvent(Object source) { super(source); this.timestamp = System.currentTimeMillis(); } /** * Return the system time in milliseconds when the event happened. */ public final long getTimestamp() { return this.timestamp; } }
public interface EventLisenter<T extends BaseEvent>{ /** * 事件处理 * @param baseEvent */ void onEvent(T t); }
2、定义远程监听配置
public class RemoteLisenter{ private Class eventClass; private Class serviceInterface; private String serviceName; private String registryPort; public Class getServiceInterface() { return serviceInterface; } public void setServiceInterface(Class serviceInterface) { this.serviceInterface = serviceInterface; } public String getServiceName() { return serviceName; } public void setServiceName(String serviceName) { this.serviceName = serviceName; }
监听管理类,用于事件注册,更新远程监听,发布事件
public class RemoteLisenterConfig { private List<RemoteLisenter> remoteLisenters =new ArrayList<RemoteLisenter>(); public List<RemoteLisenter> getRemoteLisenters() { return remoteLisenters; } public void setRemoteLisenters(List<RemoteLisenter> remoteLisenters) { this.remoteLisenters = remoteLisenters; } }
@Service public class ListennerManagement { protected Logger logger = Logger.getLogger(getClass()); @Autowired private HeartbeatService heartbeatService; @Autowired RemoteLisenterConfig remoteLisenterConfig; /** * 本地监听 */ private Map<String, List<EventLisenter>> localListeners = new LinkedHashMap<String, List<EventLisenter>>(); /** * 远程监听 */ private Map<String, List<EventLisenter>> remoteListeners = new LinkedHashMap<String, List<EventLisenter>>(); /** * 扫瞄所有bean,进行队本地事件进行事件监听 * * @throws Exception */ @SuppressWarnings("rawtypes") public void registryListener(ApplicationContext ctx) throws Exception { // 取得容器中所有监听 Map<String, EventLisenter> beans = ctx .getBeansOfType(EventLisenter.class); if (beans == null || beans.size() == 0) { return; } Collection<EventLisenter> list = beans.values(); for (EventLisenter listener : list) { Class listenercls = AopTargetUtils.getTarget(listener).getClass(); Class eventCls = GenericTypeResolver.resolveTypeArgument( listenercls, EventLisenter.class); try { if (localListeners.containsKey(eventCls.getName())) { localListeners.get(eventCls.getName()).add(listener); } else { List<EventLisenter> l = new ArrayList<EventLisenter>(); l.add(listener); localListeners.put(eventCls.getName(), l); } } catch (Exception e) { throw new Exception("初始化事件监听器时出错:", e); } } } private void refreshRemoteListeners(){ //查询出集群服务器的IP(此处从数据库配置中查询) List<String> ipList=heartbeatService.getAliveHostsExcludeSelf(); List<RemoteLisenter> RemoteLisenterList=remoteLisenterConfig.getRemoteLisenters(); remoteListeners = new LinkedHashMap<String, List<EventLisenter>>(); if(RemoteLisenterList!=null){ for (RemoteLisenter remoteLisenter : RemoteLisenterList) { String eventClsName=remoteLisenter.getEventClass().getName(); Class listenerCls=remoteLisenter.getServiceInterface(); String port=remoteLisenter.getRegistryPort(); String serviceName=remoteLisenter.getServiceName(); if(ipList!=null){ for (String ip : ipList) { EhCacheService ehCacheService=null; EventLisenter listener = buildRemotListener(listenerCls, port,serviceName, ip); if(listener!=null){ if (remoteListeners.containsKey(eventClsName)) { remoteListeners.get(eventClsName).add(listener); } else { List<EventLisenter> l = new ArrayList<EventLisenter>(); l.add(listener); remoteListeners.put(eventClsName, l); } } } } } } } private EventLisenter buildRemotListener(Class listenerCls, String port, String serviceName, String ip) { try { RmiProxyFactoryBean rmiProxyFactoryBean = new RmiProxyFactoryBean(); rmiProxyFactoryBean.setServiceInterface(listenerCls); rmiProxyFactoryBean.setServiceUrl("rmi://"+ip+":"+port+"/"+serviceName); rmiProxyFactoryBean.afterPropertiesSet(); if (rmiProxyFactoryBean.getObject() instanceof EventLisenter) { EventLisenter listener=(EventLisenter)rmiProxyFactoryBean.getObject(); return listener; }else{ return null; } } catch (Exception e) { logger.error("获取远程监听bean错误[listenerClass="+listenerCls+";port="+port+";ip="+ip+";serviceName="+serviceName+"]", e); return null; } } /** * 发布事件 * * @throws Exception */ @SuppressWarnings("rawtypes") public void publishEvent(BaseEvent event) { //本地监控 List<EventLisenter> localList = localListeners.get(event.getClass().getName()); if (localList != null) { for (EventLisenter listener : localList) { try { listener.onEvent(event); } catch (Exception e) { logger.error(e.getMessage()); } } } //远程监控 Class eventClass=event.getClass(); if(needRemoteListenre(eventClass)){ //刷新远程监听者 refreshRemoteListeners(); List<EventLisenter> remoteList = remoteListeners.get(event.getClass().getName()); if (remoteList != null) { for (EventLisenter listener : remoteList) { try { listener.onEvent(event); } catch (Exception e) { logger.error(e.getMessage()); } } } } } /** * 判断本事件是否需要远程监听 * @param eventClass * @return */ private boolean needRemoteListenre(Class eventClass) { List<RemoteLisenter> RemoteLisenterList=remoteLisenterConfig.getRemoteLisenters(); if(RemoteLisenterList!=null){ for (RemoteLisenter remoteLisenter : RemoteLisenterList) { Class eventCls=remoteLisenter.getEventClass(); if(eventCls.equals(eventCls)) return true; } } return false; } public Map<String, List<EventLisenter>> getLocalListeners() { return localListeners; } public Map<String, List<EventLisenter>> getRemoteListeners() { return remoteListeners; }
配置文件
<!-- 配置远程监听配置对象 --> <bean id="remoteLisenterConfig" class="com.ejintai.cbs_policy_registry.base.event.RemoteLisenterConfig"> <property name="remoteLisenters"> <list> <bean id="remoteLisenter1" class="com.ejintai.cbs_policy_registry.base.event.RemoteLisenter"> <property name="eventClass" value="com.ejintai.cbs_policy_registry.base.event.biz.EhCacheUpdateEvent" /> <property name="serviceName" value="ehCacheUpdateEventListener" /> <property name="serviceInterface" value="com.ejintai.cbs_policy_registry.base.event.EventLisenter"/> <property name="registryPort" value="${rmi.port}"/> </bean> </list> </property> </bean> <bean id="remoteEhCacheUpdateEventListener" class="org.springframework.remoting.rmi.RmiServiceExporter" > <property name="serviceName" value="ehCacheUpdateEventListener" /> <property name="service" ref="ehCacheUpdateEventListener"/> <property name="serviceInterface" value="com.ejintai.cbs_policy_registry.base.event.EventLisenter"/> <property name="registryPort" value="${rmi.port}"/> </bean>