5.Redis消息订阅/发布 博客分类: Redis RedisJedisChannel消息订阅消息发布
程序员文章站
2024-03-21 23:40:10
...
Redis可以很容的实现消息订阅/发布功能
一.JedisPubSub
需要实现一个JedisPubSub,相当于Redis消息的Listener
package com.gqshao.redis.channels; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import redis.clients.jedis.JedisPubSub; public class MyJedisPubSub extends JedisPubSub { protected static Logger logger = LoggerFactory.getLogger(MyJedisPubSub.class); // 取得订阅的消息后的处理 public void onMessage(String channel, String message) { logger.info("取得订阅的消息后的处理 : " + channel + "=" + message); } // 初始化订阅时候的处理 public void onSubscribe(String channel, int subscribedChannels) { logger.info("初始化订阅时候的处理 : " + channel + "=" + subscribedChannels); } // 取消订阅时候的处理 public void onUnsubscribe(String channel, int subscribedChannels) { logger.info("取消订阅时候的处理 : " + channel + "=" + subscribedChannels); } // 初始化按表达式的方式订阅时候的处理 public void onPSubscribe(String pattern, int subscribedChannels) { logger.info("初始化按表达式的方式订阅时候的处理 : " + pattern + "=" + subscribedChannels); } // 取消按表达式的方式订阅时候的处理 public void onPUnsubscribe(String pattern, int subscribedChannels) { logger.info(" 取消按表达式的方式订阅时候的处理 : " + pattern + "=" + subscribedChannels); } // 取得按表达式的方式订阅的消息后的处理 public void onPMessage(String pattern, String channel, String message) { logger.info("取得按表达式的方式订阅的消息后的处理 :" + pattern + "=" + channel + "=" + message); } }
二.消息订阅/发布
1.消息的订阅需要一个Redis连接始终保持连接,Jedis中停止订阅的unsubscribe是在JedisPubSub中
2.程序中因为需要Jedis始终保持连接,又有可能需要停止订阅,所以用到了ExecutorService
package com.gqshao.redis.channels; import com.gqshao.redis.JedisTest; import org.junit.Test; import redis.clients.jedis.Jedis; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; /** * 发布/订阅 */ public class MessageTest extends JedisTest { /** * SUBSCRIBE [channel...] 订阅一个匹配的通道 * PSUBSCRIBE [pattern...] 订阅匹配的通道 * PUBLISH [channel] [message] 将value推送到channelone通道中 * UNSUBSCRIBE [channel...] 取消订阅消息 * PUNSUBSCRIBE [pattern ...] 取消匹配的消息订阅 * web环境中可以编写一个JedisPubSub 继承 @see redis.clients.jedis.JedisPubSub来实现监听 * Jedis中通过使用 JedisPubSub.UNSUBSCRIBE/PUNSUBSCRIBE 来取消订阅 */ @Test public void testSubscribe() { final MyJedisPubSub listener = new MyJedisPubSub(); Thread thread = new Thread(new Runnable() { @Override public void run() { logger.info("subscribe channelA.test channelB.send_message"); jedis.subscribe(listener, "channelA.test", "channelB.send_message"); } }); ExecutorService executor = Executors.newSingleThreadExecutor(); executor.execute(thread); // 测试发送 Jedis pubJedis = pool.getResource(); logger.info("publish channelA.test OK : " + pubJedis.publish("channelA.test", "OK")); logger.info("publish channelB.send_message \"Hello World!\" : " + pubJedis.publish("channelB.send_message", "Hello World!")); listener.unsubscribe("channelA.test", "channelB.send_message"); try { executor.shutdownNow(); logger.info("executor.shutdownNow"); if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { logger.warn("Pool did not terminated"); } } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } logger.info("完成subscribe测试"); } /** * SUBSCRIBE channelone 订阅一个通道 * PSUBSCRIBE channel* 订阅一批通道 * PUBLISH channelone value 将value推送到channelone通道中 * web环境中可以编写一个Listener 继承 @see redis.clients.jedis.JedisPubSub来实现监听 */ @Test public void testPsubscribe() { final MyJedisPubSub listener = new MyJedisPubSub(); Thread thread = new Thread(new Runnable() { @Override public void run() { logger.info("psubscribe channel*"); jedis.psubscribe(listener, "channel*"); } }); ExecutorService executor = Executors.newSingleThreadExecutor(); executor.execute(thread); // 测试发送 Jedis pubJedis = pool.getResource(); logger.info("publish channelA.test OK: " + pubJedis.publish("channelA.test", "OK")); logger.info("publish channelB.send_message \"Hello World!\"" + pubJedis.publish("channelB.send_message", "Hello World!")); pool.returnResource(pubJedis); listener.punsubscribe(); try { executor.shutdownNow(); logger.info("executor.shutdownNow"); if (!executor.awaitTermination(10, TimeUnit.SECONDS)) { logger.warn("Pool did not terminated"); } } catch (InterruptedException ie) { Thread.currentThread().interrupt(); } logger.info("完成psubscribe测试"); logger.info("publish channelA.test OK: " + pubJedis.publish("channelA.test", "OK")); } }
上一篇: iOS热更新、热修复方案
下一篇: Python在工作中的应用4:数据合并
推荐阅读
-
5.Redis消息订阅/发布 博客分类: Redis RedisJedisChannel消息订阅消息发布
-
JedisDataException: ERR only (P)SUBSCRIBE / (P)UNSUBSCRIBE / QUIT allowed in thi 博客分类: redis jedis发布订阅JedisDataException
-
Kafka入门 博客分类: java kafka消息系统消息发布订阅分布式集群
-
redis之发布订阅 博客分类: redis redisredis 发布订阅
-
redis之发布订阅 博客分类: redis redisredis 发布订阅
-
spring-data-redis消息订阅RedisMessageListenerContainer源码解读 博客分类: 架构相关 redis源码
-
在PHP中如何使用RabbitMQ来实现消息的订阅和发布?
-
利用Spring Data Redis 来实现消息的发布订阅机制
-
Springboot2 之 Spring Data Redis 实现消息队列——发布/订阅模式
-
Redis高级实用特性(持久化机制、发布订阅消息、虚拟内存的使用)