欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

5.Redis消息订阅/发布 博客分类: Redis RedisJedisChannel消息订阅消息发布 

程序员文章站 2024-03-21 23:40:10
...

Redis可以很容的实现消息订阅/发布功能

 

一.JedisPubSub

需要实现一个JedisPubSub,相当于Redis消息的Listener

package com.gqshao.redis.channels;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import redis.clients.jedis.JedisPubSub;

public class MyJedisPubSub extends JedisPubSub {

    protected static Logger logger = LoggerFactory.getLogger(MyJedisPubSub.class);

    // 取得订阅的消息后的处理  
    public void onMessage(String channel, String message) {
        logger.info("取得订阅的消息后的处理 : " + channel + "=" + message);
    }

    // 初始化订阅时候的处理  
    public void onSubscribe(String channel, int subscribedChannels) {
        logger.info("初始化订阅时候的处理 : " + channel + "=" + subscribedChannels);
    }

    // 取消订阅时候的处理  
    public void onUnsubscribe(String channel, int subscribedChannels) {
        logger.info("取消订阅时候的处理 : " + channel + "=" + subscribedChannels);
    }

    // 初始化按表达式的方式订阅时候的处理  
    public void onPSubscribe(String pattern, int subscribedChannels) {
        logger.info("初始化按表达式的方式订阅时候的处理 : " + pattern + "=" + subscribedChannels);
    }

    // 取消按表达式的方式订阅时候的处理  
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
        logger.info(" 取消按表达式的方式订阅时候的处理 : " + pattern + "=" + subscribedChannels);
    }

    // 取得按表达式的方式订阅的消息后的处理  
    public void onPMessage(String pattern, String channel, String message) {
        logger.info("取得按表达式的方式订阅的消息后的处理 :" + pattern + "=" + channel + "=" + message);
    }
}

 

 

二.消息订阅/发布

1.消息的订阅需要一个Redis连接始终保持连接,Jedis中停止订阅的unsubscribe是在JedisPubSub中

2.程序中因为需要Jedis始终保持连接,又有可能需要停止订阅,所以用到了ExecutorService

package com.gqshao.redis.channels;


import com.gqshao.redis.JedisTest;
import org.junit.Test;
import redis.clients.jedis.Jedis;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
 * 发布/订阅
 */
public class MessageTest extends JedisTest {

    /**
     * SUBSCRIBE [channel...] 订阅一个匹配的通道
     * PSUBSCRIBE [pattern...] 订阅匹配的通道
     * PUBLISH [channel] [message] 将value推送到channelone通道中
     * UNSUBSCRIBE [channel...] 取消订阅消息
     * PUNSUBSCRIBE [pattern ...] 取消匹配的消息订阅
     * web环境中可以编写一个JedisPubSub 继承 @see redis.clients.jedis.JedisPubSub来实现监听
     * Jedis中通过使用 JedisPubSub.UNSUBSCRIBE/PUNSUBSCRIBE 来取消订阅
     */
    @Test
    public void testSubscribe() {
        final MyJedisPubSub listener = new MyJedisPubSub();
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                logger.info("subscribe channelA.test channelB.send_message");
                jedis.subscribe(listener, "channelA.test", "channelB.send_message");
            }
        });
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.execute(thread);

        // 测试发送
        Jedis pubJedis = pool.getResource();
        logger.info("publish channelA.test OK : " + pubJedis.publish("channelA.test", "OK"));
        logger.info("publish channelB.send_message \"Hello World!\" : " + pubJedis.publish("channelB.send_message", "Hello World!"));
        listener.unsubscribe("channelA.test", "channelB.send_message");
        try {
            executor.shutdownNow();
            logger.info("executor.shutdownNow");
            if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
                logger.warn("Pool did not terminated");
            }
        } catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
        logger.info("完成subscribe测试");
    }


    /**
     * SUBSCRIBE channelone 订阅一个通道
     * PSUBSCRIBE channel* 订阅一批通道
     * PUBLISH channelone value 将value推送到channelone通道中
     * web环境中可以编写一个Listener 继承 @see redis.clients.jedis.JedisPubSub来实现监听
     */
    @Test
    public void testPsubscribe() {
        final MyJedisPubSub listener = new MyJedisPubSub();
        Thread thread = new Thread(new Runnable() {
            @Override
            public void run() {
                logger.info("psubscribe channel*");
                jedis.psubscribe(listener, "channel*");
            }
        });
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.execute(thread);

        // 测试发送
        Jedis pubJedis = pool.getResource();

        logger.info("publish channelA.test OK: " + pubJedis.publish("channelA.test", "OK"));
        logger.info("publish channelB.send_message \"Hello World!\"" + pubJedis.publish("channelB.send_message", "Hello World!"));

        pool.returnResource(pubJedis);
        listener.punsubscribe();
        try {
            executor.shutdownNow();
            logger.info("executor.shutdownNow");
            if (!executor.awaitTermination(10, TimeUnit.SECONDS)) {
                logger.warn("Pool did not terminated");
            }
        } catch (InterruptedException ie) {
            Thread.currentThread().interrupt();
        }
        logger.info("完成psubscribe测试");
        logger.info("publish channelA.test OK: " + pubJedis.publish("channelA.test", "OK"));
    }

}