redis发布订阅Java代码实现
程序员文章站
2022-04-28 13:56:46
Redis除了可以用作缓存数据外,另一个重要用途是它实现了发布订阅(pub/sub)消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。 为了实现redis的发布订阅机制,首先要打开redis服务;其次,引入redis需要的jar包,在pom.xml配置文件加入以下代码:
redis除了可以用作缓存数据外,另一个重要用途是它实现了发布订阅(pub/sub)消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。
为了实现redis的发布订阅机制,首先要打开redis服务;其次,引入redis需要的jar包,在pom.xml配置文件加入以下代码:
<dependency>
<groupid>redis.clients</groupid>
<artifactid>jedis</artifactid>
<version>2.1.0</version>
</dependency>
由于订阅消息通道需要再tomcat启动时触发,因此,需要创建一个listener监听器,在监听器里实现redis订阅,在web.xml里配置监听器如下:
<listener>
<listener-class>com.test.listener.initlistener</listener-class>
</listener>
一、订阅消息(initlistener实现)
redis支持多通道订阅,一个客户端可以同时订阅多个消息通道,如下代码所示,订阅了13个通道。由于订阅机制是线程阻塞的,需要额外开启一个线程专门用于处理订阅消息及接收消息处理。
public class initlistener implements servletcontextlistener{ private logger logger = logger.getlogger(initlistener.class); @override public void contextinitialized(servletcontextevent sce) { logger.info("启动tomcat");// 连接redis map<string, string> promap = propertyreader.getproperties(); final string url = promap.get("redis.host"); final integer port = integer.parseint(promap.get("redis.port")); final classpathxmlapplicationcontext classpathxmlapplicationcontext = new classpathxmlapplicationcontext("classpath*:applicationcontext.xml"); final redissublistener redissublistener = (redissublistener) classpathxmlapplicationcontext.getbean("redissublistener"); // 为防止阻塞tomcat启动,开启线程执行 new thread(new runnable(){ public void run(){ // 连接redis,建立监听 jedis jedis = null; while(true){ //解码资源更新通知,画面选看回复,画面选看停止回复,预案启动,预案停止,轮切启动,轮切停止,预案启动回复,预案停止回复,轮切启动回复,轮切停止回复,监视屏分屏状态通知,画面状态通知 string[] channels = new string[] { "decoderesourceupdatentf", "tvsplitplayrsp","tvsplitplaystoprsp", "planstartstatusntf", "planstopstatusntf", "pollstartstatusntf", "pollstopstatusntf", "planstartrsp","planstoprsp","pollstartrsp","pollstoprsp","tvsplittypentf","tvsplitstatusntf"}; try{ jedis = new jedis(url,port); logger.info("redis请求订阅通道"); jedis.subscribe(redissublistener,channels); logger.info("redis订阅结束"); }catch(jedisconnectionexception e){ logger.error("jedis连接异常,异常信息 :" + e); }catch(illegalstateexception e){ logger.error("jedis异常,异常信息 :" + e); } try { thread.sleep(1000); } catch (interruptedexception e) { e.printstacktrace(); } if(jedis != null){ jedis = null; } } }}) .start(); }
最后在spring配置文件里接入以下配置:
<!-- redis --> <bean id="redismessageservice" class="com.test.service.impl.redismessageserviceimpl" scope="singleton"> <property name="websocketservice"><ref local="websocketservice" /></property> <property name="tvsplitstatusdao" ref="tvsplitstatusdao"></property> </bean> <bean id="redissublistener" class="com.test.common.redissublistener" scope="singleton"> <property name="redismessageservice"><ref local="redismessageservice" /></property> </bean>
redismessageserviceimpl用于处理接收的redis消息。
二、发布消息
public class redispublishutil { private logger logger = logger.getlogger(redispublishutil.class); public static jedis pubjedis; private static map<string, string> promap = propertyreader.getproperties(); private static final string redisport = promap.get("redis.port"); private static string url = promap.get("redis.host"); private static final int port = integer.parseint(redisport); public void setpubjedis(jedis jedis) { redispublishutil.pubjedis = jedis; } public jedis getpubjedis() { if (pubjedis == null) { createjedisconnect(); } // 返回对象 return pubjedis; } public jedis createjedisconnect(){ // 连接redis logger.info("===创建连接jedis====="); try { pubjedis = new jedis(url, port); } catch (jedisconnectionexception e) { logger.error("jedis连接异常,异常信息 :" + e.getmessage()); try { thread.sleep(1000); logger.info("发起重新连接jedis"); createjedisconnect(); } catch (interruptedexception except) { except.printstacktrace(); } } // 返回对象 return pubjedis; } //公共发布接口 public void pubredismsg(string msgtype,string msg){ logger.info("redis准备发布消息内容:" + msg); try { this.getpubjedis().publish(msgtype, msg); } catch (jedisconnectionexception e) { logger.error("redis发布消息失败!", e); this.setpubjedis(null); logger.info("重新发布消息,channel="+msgtype); pubredismsg(msgtype, msg); } } }
public class propertyreader { private static logger logger = logger.getlogger(propertyreader.class); /* * 获得数据库链接的配置文件 */ public static map<string,string> getproperties(){ logger.info("读取redis配置文件开始。。。"); properties prop = new properties(); map<string,string> promap = new hashmap<string,string>(); try { //读取属性文件redis.properties inputstream in= propertyreader.class.getclassloader().getresourceasstream("redis.properties"); prop.load(in); ///加载属性列表 iterator<string> it=prop.stringpropertynames().iterator(); while(it.hasnext()){ string key=it.next(); promap.put(key, prop.getproperty(key)); } in.close(); logger.info("读取redis配置文件成功。。。"); } catch (exception e) { logger.error("读取redis配置文件异常!", e); e.printstacktrace(); } return promap; } }