欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

redis发布订阅Java代码实现

程序员文章站 2022-12-16 16:54:53
Redis除了可以用作缓存数据外,另一个重要用途是它实现了发布订阅(pub/sub)消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。 为了实现redis的发布订阅机制,首先要打开redis服务;其次,引入redis需要的jar包,在pom.xml配置文件加入以下代码:

redis除了可以用作缓存数据外,另一个重要用途是它实现了发布订阅(pub/sub)消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。

为了实现redis的发布订阅机制,首先要打开redis服务;其次,引入redis需要的jar包,在pom.xml配置文件加入以下代码:

<dependency>
<groupid>redis.clients</groupid>
<artifactid>jedis</artifactid>
<version>2.1.0</version>
</dependency>

由于订阅消息通道需要再tomcat启动时触发,因此,需要创建一个listener监听器,在监听器里实现redis订阅,在web.xml里配置监听器如下:

<listener>
<listener-class>com.test.listener.initlistener</listener-class>
</listener>

一、订阅消息(initlistener实现)

redis支持多通道订阅,一个客户端可以同时订阅多个消息通道,如下代码所示,订阅了13个通道。由于订阅机制是线程阻塞的,需要额外开启一个线程专门用于处理订阅消息及接收消息处理。

public class initlistener implements servletcontextlistener{
    private logger logger = logger.getlogger(initlistener.class);
    
    @override
    public void contextinitialized(servletcontextevent sce) {
        logger.info("启动tomcat");// 连接redis
        map<string, string> promap = propertyreader.getproperties();
        final string url = promap.get("redis.host");
        final integer port = integer.parseint(promap.get("redis.port"));
        final classpathxmlapplicationcontext classpathxmlapplicationcontext = new classpathxmlapplicationcontext("classpath*:applicationcontext.xml");
        final redissublistener redissublistener = (redissublistener) classpathxmlapplicationcontext.getbean("redissublistener");
        // 为防止阻塞tomcat启动,开启线程执行
        new thread(new runnable(){  
            public void run(){  
                // 连接redis,建立监听
                jedis jedis = null;
                while(true){
                    //解码资源更新通知,画面选看回复,画面选看停止回复,预案启动,预案停止,轮切启动,轮切停止,预案启动回复,预案停止回复,轮切启动回复,轮切停止回复,监视屏分屏状态通知,画面状态通知
                    string[] channels = new string[] { "decoderesourceupdatentf", "tvsplitplayrsp","tvsplitplaystoprsp",
                            "planstartstatusntf", "planstopstatusntf", "pollstartstatusntf", "pollstopstatusntf",
                            "planstartrsp","planstoprsp","pollstartrsp","pollstoprsp","tvsplittypentf","tvsplitstatusntf"};
                    try{
                        jedis = new jedis(url,port);
                        logger.info("redis请求订阅通道");
                        jedis.subscribe(redissublistener,channels);
                        logger.info("redis订阅结束");
                    }catch(jedisconnectionexception e){
                        logger.error("jedis连接异常,异常信息 :" + e);
                    }catch(illegalstateexception e){
                         logger.error("jedis异常,异常信息 :" + e);
                    }
                    
                    try {
                        thread.sleep(1000);
                    } catch (interruptedexception e) {
                        e.printstacktrace();
                    }
                    if(jedis != null){
                        jedis = null;
                    }
                }
            }})
        .start();
    }

最后在spring配置文件里接入以下配置:

<!-- redis -->
     <bean id="redismessageservice" class="com.test.service.impl.redismessageserviceimpl" scope="singleton">
         <property name="websocketservice"><ref local="websocketservice" /></property>
         <property name="tvsplitstatusdao" ref="tvsplitstatusdao"></property>
     </bean>
     <bean id="redissublistener" class="com.test.common.redissublistener" scope="singleton">
         <property name="redismessageservice"><ref local="redismessageservice" /></property>
     </bean>
redismessageserviceimpl用于处理接收的redis消息。

二、发布消息

public class redispublishutil {
    private logger logger = logger.getlogger(redispublishutil.class);
    public static jedis pubjedis;
    private static map<string, string> promap = propertyreader.getproperties();
    private static final string redisport = promap.get("redis.port");
    private static string url = promap.get("redis.host");
    private static final int port = integer.parseint(redisport);
    
    public void setpubjedis(jedis jedis) {
        redispublishutil.pubjedis = jedis;
    }
    
    public jedis getpubjedis() {
        if (pubjedis == null) {
            createjedisconnect();
        }
        // 返回对象
        return pubjedis;
    }
    
    public jedis createjedisconnect(){
        // 连接redis
        logger.info("===创建连接jedis=====");
        try {
            pubjedis = new jedis(url, port);
        } catch (jedisconnectionexception e) {
            logger.error("jedis连接异常,异常信息 :" + e.getmessage());
            try {
                thread.sleep(1000);
                logger.info("发起重新连接jedis");
                createjedisconnect();
            } catch (interruptedexception except) {
                except.printstacktrace();
            }
        }
        // 返回对象
        return pubjedis;
    }
    //公共发布接口
    public void pubredismsg(string msgtype,string msg){
        logger.info("redis准备发布消息内容:" + msg);
        try {
            this.getpubjedis().publish(msgtype, msg);

        } catch (jedisconnectionexception e) {
            logger.error("redis发布消息失败!", e);
            this.setpubjedis(null);
            logger.info("重新发布消息,channel="+msgtype);
            pubredismsg(msgtype, msg);
        }
    }

}
public class propertyreader {

     private static logger logger = logger.getlogger(propertyreader.class);
    
    /*
     * 获得数据库链接的配置文件
     */
    public static map<string,string> getproperties(){
        logger.info("读取redis配置文件开始。。。");
        
         properties prop = new properties();     
        
         map<string,string> promap  = new hashmap<string,string>();
         
        try {
             //读取属性文件redis.properties
            inputstream in= propertyreader.class.getclassloader().getresourceasstream("redis.properties");  
            
            prop.load(in);     ///加载属性列表
            iterator<string> it=prop.stringpropertynames().iterator();
            while(it.hasnext()){
                string key=it.next();
                promap.put(key, prop.getproperty(key));
            }
            in.close();
            logger.info("读取redis配置文件成功。。。");
        } catch (exception e) {
            logger.error("读取redis配置文件异常!", e);
            e.printstacktrace();
        }
        return promap;
    }
}