Redis总结(五)redis发布订阅模式
本篇将向大家介绍怎么通过redis来实现订阅和发布功能
首先介绍一下实现功能的主要几个命令:
- SUBSCRIBE 命令,这个命令可以让我们订阅任意数量的频道
- PUBLISH 命令,此命令是用来发布消息
- PSUBSCRIBE命令,此命令用来支持模糊订阅的功能
在展示具体的demo之前,我们先简单了解下这其中的原理:
在redisServer结构中的其中一个属性pubsub_channels是用来记录channel和客户端之间的关系,是使用key–>List的数据格式。如图:
在我们使用SUBSCRIBE 命令在客户端client10086订阅了channel1 channel2,channel3
订阅:
SUBSCRIBE channel1 channel2,channel3
这时pubsub_channels的数据将会变为,如图:
这就可以看出来执行SUBSCRIBE 命令就是将客户端信息添加到对应的channel对应列表的尾部。
模式订阅:
模式订阅设计到redisServer的另一个属性pubsub_patterns,也是一个链表,里面存储着客户端订阅的所有模式。结构如下图:
当客户端订阅了一个模式,此时结构变为:
发布:
PUBLISH 命令发布消息将消息推送到对应的客户端
在执行PUBLISH 命令发布消息的时候,首先会在pubsub_channels上找到对应的channel,遍历其中所有的client信息,将消息发送到所有client;同时也会在pubsub_patterns上遍历找到匹配的模式,发给对应的客户端
取消订阅:
UNSUBSCRIBE命令取消对应客户端的订阅
当执行UNSUBSCRIBE命令时则将对应的client从channel列表中移除
具体demo如下:
首先我们创建两个客户端执行体:
package com.example.redisdemo.service;
import redis.clients.jedis.JedisPubSub;
/**
* 订阅消息消费体.
* @author yzlu
*/
public class OneJedisPubSub extends JedisPubSub {
//接收到消息时执行
@Override
public void onMessage(String channel, String message){
System.out.println("oneJedisPubSub message is" + message);
}
//接收到模式消息时执行
@Override
public void onPMessage(String pattern, String channel, String message){
System.out.println("oneJedisPubSub pattern是"+pattern+"channel是"+channel + "message是" + message);
}
//订阅时执行
@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println("oneJedisPubSub订阅成功");
}
//取消订阅时执行
@Override
public void onUnsubscribe(String channel, int subscribedChannels){
System.out.println("oneJedisPubSub取消订阅"+channel);
}
//取消模式订阅时执行
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
System.out.println("oneJedisPubSub取消多订阅"+pattern);
}
}
package com.example.redisdemo.service;
import redis.clients.jedis.JedisPubSub;
public class SecondJedisPubSub extends JedisPubSub {
//接收到消息时执行
@Override
public void onMessage(String channel, String message){
System.out.println(" SecondJedisPubSub message is" + message);
}
//接收到模式消息时执行
@Override
public void onPMessage(String pattern, String channel, String message){
System.out.println("SecondJedisPubSub pattern是"+pattern+"channel是"+channel + "message是" + message);
}
//取消订阅时执行
@Override
public void onUnsubscribe(String channel, int subscribedChannels){
System.out.println("SecondJedisPubSub 取消订阅"+channel);
}
//订阅时执行
@Override
public void onSubscribe(String channel, int subscribedChannels) {
System.out.println("SecondJedisPubSub订阅成功");
}
//取消模式订阅时执行
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels) {
System.out.println("SecondJedisPubSub 取消多订阅"+pattern);
}
}
然后我们开始给这两个客户端订阅消息
@RestController
@RequestMapping("test")
@Slf4j
public class TestController {
@Autowired
private RedisClient redisClient;
private final OneJedisPubSub oneJedisPubSub = new OneJedisPubSub();
private final SecondJedisPubSub secondJedisPubSub = new SecondJedisPubSub();
@PostMapping("subscribe")
public void subscribe(@RequestBody QueueTest queueTest){
new Thread(new Runnable() {
@Override
public void run() {
try {
if("1".equals(queueTest.getTopic())){
redisClient.subscribe(oneJedisPubSub,"topic1","topic2");
}
if("2".equals(queueTest.getTopic())){
redisClient.subscribe(secondJedisPubSub,"topic2");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
}
请求情况如下图:
请求结果:
如图,可以看出我们将两个客户端都订阅了一定channel
此时OneJedisPubSub订阅了topic1和topic2,SecondJedisPubSub订阅了topic2,我们尝试推送消息,demo如下:
@PostMapping("push")
public void push(@RequestBody QueueTest queueTest){
log.info("发布一条消息");
Long publish = redisClient.publish(queueTest.getTopic(), queueTest.getName());
System.out.println("消费者数量"+publish);
}
请求如下:
结果如下:
可以看到我们往topic1发布了消息只有OneJedisPubSub接收到了消息,接下来我们往topic2发布消息
请求如下:
结果如下:
可以看到此时两个客户端都接收到了消息
在测试完毕客户端接收消息的能力,我们这时取消SecondJedisPubSub订阅topic2,demo如下:
@PostMapping("unno")
public void unno(@RequestBody QueueTest queueTest){
log.info("取消订阅消息");
try {
secondJedisPubSub.unsubscribe(queueTest.getTopic());
} catch (Exception e) {
e.printStackTrace();
}
}
请求如下:
结果如下:
在取消后我们再往topic2推送消息,可以看到只有一个客户端接收消息
如图:
至此我们实验了大部分场景,至于模式订阅由于贴图太麻烦,我就将代码提供出来,大家可以自己实验:
@PostMapping("subscribe")
public void subscribe(@RequestBody QueueTest queueTest){
new Thread(new Runnable() {
@Override
public void run() {
try {
if("1".equals(queueTest.getTopic())){
redisClient.pubsubPattern(oneJedisPubSub,"topic*");
}
if("2".equals(queueTest.getTopic())){
redisClient.subscribe(secondJedisPubSub,"topic2");
}
} catch (Exception e) {
e.printStackTrace();
}
}
}).start();
}
@PostMapping("unno")
public void unno(@RequestBody QueueTest queueTest){
log.info("取消模式订阅消息");
try {
secondJedisPubSub.punsubscribe(queueTest.getTopic());
} catch (Exception e) {
e.printStackTrace();
}
}
最后将redisclient的代码提供给大家
@Component("redisClient")
@Slf4j
public class RedisClient {
@Resource
private JedisPool jedisPool;
/**
* 发布消息
* @param topic
* @param message
*/
public Long publish(String topic,String message){
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
return jedis.publish(topic, message);
} catch (Exception e) {
throw e;
} finally {
if(jedis != null){
jedis.close();
}
}
}
/**
* 订阅消息
* @param jedisPubSub
* @param topics
*/
public void subscribe(JedisPubSub jedisPubSub, String... topics) throws Exception {
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.subscribe(jedisPubSub,topics);
} catch (Exception e) {
throw e;
} finally {
if(jedis != null){
jedis.close();
}
}
}
/**
* 模式匹配订阅消息
* @param topic
*/
public void pubsubPattern(JedisPubSub jedisPubSub,String topic){
Jedis jedis = null;
try {
jedis = jedisPool.getResource();
jedis.psubscribe(jedisPubSub,topic);
} catch (Exception e) {
throw e;
} finally {
if(jedis != null){
jedis.close();
}
}
}
}
以上就是reids的发布订阅功能,当然此篇和上一篇一样只是为了熟悉redis的一些功能,真正需要这种场景时我们完全可以使用mq的交换机同样可以实现,而且更专业,但是如果大家懒得维护mq那么这种情况也可以使用。
等redis系列写完后续也会陆续写出mq的系列文章,但是由于最近上班的开发任务比较众,所以能够抽出时间写文章的时间不多,更新可能比较慢,大家谅解。