欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Redis总结(五)redis发布订阅模式

程序员文章站 2022-05-21 13:42:24
...

本篇将向大家介绍怎么通过redis来实现订阅和发布功能

首先介绍一下实现功能的主要几个命令:

  1. SUBSCRIBE 命令,这个命令可以让我们订阅任意数量的频道
  2. PUBLISH 命令,此命令是用来发布消息
  3. PSUBSCRIBE命令,此命令用来支持模糊订阅的功能

在展示具体的demo之前,我们先简单了解下这其中的原理:

在redisServer结构中的其中一个属性pubsub_channels是用来记录channel和客户端之间的关系,是使用key–>List的数据格式。如图:
Redis总结(五)redis发布订阅模式

在我们使用SUBSCRIBE 命令在客户端client10086订阅了channel1 channel2,channel3

订阅:

SUBSCRIBE channel1 channel2,channel3

这时pubsub_channels的数据将会变为,如图:

Redis总结(五)redis发布订阅模式

这就可以看出来执行SUBSCRIBE 命令就是将客户端信息添加到对应的channel对应列表的尾部。

模式订阅:
模式订阅设计到redisServer的另一个属性pubsub_patterns,也是一个链表,里面存储着客户端订阅的所有模式。结构如下图:

Redis总结(五)redis发布订阅模式

当客户端订阅了一个模式,此时结构变为:

Redis总结(五)redis发布订阅模式

发布:

PUBLISH 命令发布消息将消息推送到对应的客户端

在执行PUBLISH 命令发布消息的时候,首先会在pubsub_channels上找到对应的channel,遍历其中所有的client信息,将消息发送到所有client;同时也会在pubsub_patterns上遍历找到匹配的模式,发给对应的客户端

取消订阅:

UNSUBSCRIBE命令取消对应客户端的订阅

当执行UNSUBSCRIBE命令时则将对应的client从channel列表中移除

具体demo如下:

首先我们创建两个客户端执行体:

package com.example.redisdemo.service;


import redis.clients.jedis.JedisPubSub;

/**
 * 订阅消息消费体.
 * @author yzlu
 */
public class OneJedisPubSub extends JedisPubSub {

   //接收到消息时执行
    @Override
    public void  onMessage(String channel, String message){
        System.out.println("oneJedisPubSub message is" + message);
    }

    //接收到模式消息时执行
    @Override
    public void onPMessage(String pattern, String channel, String message){
        System.out.println("oneJedisPubSub pattern是"+pattern+"channel是"+channel + "message是" + message);
    }

    //订阅时执行
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        System.out.println("oneJedisPubSub订阅成功");
    }

    //取消订阅时执行
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels){
        System.out.println("oneJedisPubSub取消订阅"+channel);
    }


    //取消模式订阅时执行
    @Override
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
        System.out.println("oneJedisPubSub取消多订阅"+pattern);
    }

}

package com.example.redisdemo.service;

import redis.clients.jedis.JedisPubSub;

public class SecondJedisPubSub extends JedisPubSub {

  //接收到消息时执行
    @Override
    public void  onMessage(String channel, String message){
        System.out.println(" SecondJedisPubSub message is" + message);
    }

    //接收到模式消息时执行
    @Override
    public void onPMessage(String pattern, String channel, String message){
        System.out.println("SecondJedisPubSub pattern是"+pattern+"channel是"+channel + "message是" + message);
    }

    //取消订阅时执行
    @Override
    public void onUnsubscribe(String channel, int subscribedChannels){
        System.out.println("SecondJedisPubSub 取消订阅"+channel);
    }

    //订阅时执行
    @Override
    public void onSubscribe(String channel, int subscribedChannels) {
        System.out.println("SecondJedisPubSub订阅成功");
    }

    //取消模式订阅时执行
    @Override
    public void onPUnsubscribe(String pattern, int subscribedChannels) {
        System.out.println("SecondJedisPubSub 取消多订阅"+pattern);
    }

}

然后我们开始给这两个客户端订阅消息

@RestController
@RequestMapping("test")
@Slf4j
public class TestController {
    @Autowired
    private RedisClient redisClient;
    private final OneJedisPubSub oneJedisPubSub = new OneJedisPubSub();

    private final SecondJedisPubSub secondJedisPubSub = new SecondJedisPubSub();
    
    @PostMapping("subscribe")
    public void  subscribe(@RequestBody QueueTest queueTest){
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    if("1".equals(queueTest.getTopic())){
                        redisClient.subscribe(oneJedisPubSub,"topic1","topic2");
                    }
                    if("2".equals(queueTest.getTopic())){
                        redisClient.subscribe(secondJedisPubSub,"topic2");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
    
    
}

请求情况如下图:

Redis总结(五)redis发布订阅模式
Redis总结(五)redis发布订阅模式

请求结果:
Redis总结(五)redis发布订阅模式
Redis总结(五)redis发布订阅模式
如图,可以看出我们将两个客户端都订阅了一定channel

此时OneJedisPubSub订阅了topic1和topic2,SecondJedisPubSub订阅了topic2,我们尝试推送消息,demo如下:

  @PostMapping("push")
    public void push(@RequestBody QueueTest queueTest){
        log.info("发布一条消息");
        Long publish = redisClient.publish(queueTest.getTopic(), queueTest.getName());
        System.out.println("消费者数量"+publish);
    }

请求如下:
Redis总结(五)redis发布订阅模式

结果如下:

Redis总结(五)redis发布订阅模式

可以看到我们往topic1发布了消息只有OneJedisPubSub接收到了消息,接下来我们往topic2发布消息

请求如下:

Redis总结(五)redis发布订阅模式

结果如下:
Redis总结(五)redis发布订阅模式

可以看到此时两个客户端都接收到了消息

在测试完毕客户端接收消息的能力,我们这时取消SecondJedisPubSub订阅topic2,demo如下:

  @PostMapping("unno")
    public void  unno(@RequestBody QueueTest queueTest){
        log.info("取消订阅消息");
        try {
            secondJedisPubSub.unsubscribe(queueTest.getTopic());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

请求如下:

Redis总结(五)redis发布订阅模式

结果如下:

Redis总结(五)redis发布订阅模式

在取消后我们再往topic2推送消息,可以看到只有一个客户端接收消息

如图:
Redis总结(五)redis发布订阅模式

Redis总结(五)redis发布订阅模式

至此我们实验了大部分场景,至于模式订阅由于贴图太麻烦,我就将代码提供出来,大家可以自己实验:

   @PostMapping("subscribe")
    public void  subscribe(@RequestBody QueueTest queueTest){
        new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    if("1".equals(queueTest.getTopic())){
                        redisClient.pubsubPattern(oneJedisPubSub,"topic*");
                    }
                    if("2".equals(queueTest.getTopic())){
                        redisClient.subscribe(secondJedisPubSub,"topic2");
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
   @PostMapping("unno")
    public void  unno(@RequestBody QueueTest queueTest){
        log.info("取消模式订阅消息");
        try {
            secondJedisPubSub.punsubscribe(queueTest.getTopic());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

最后将redisclient的代码提供给大家

@Component("redisClient")
@Slf4j
public class RedisClient {
    @Resource
    private JedisPool jedisPool;
    
       /**
     * 发布消息
     * @param topic
     * @param message
     */
    public Long publish(String topic,String message){
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            return jedis.publish(topic, message);
        } catch (Exception e) {
            throw e;
        } finally {
            if(jedis != null){
                jedis.close();
            }
        }
    }

    /**
     * 订阅消息
     * @param jedisPubSub
     * @param topics
     */
    public void subscribe(JedisPubSub jedisPubSub, String... topics) throws Exception {
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.subscribe(jedisPubSub,topics);
        } catch (Exception e) {
            throw e;
        } finally {
            if(jedis != null){
                jedis.close();
            }

        }
    }

    /**
     * 模式匹配订阅消息
     * @param topic
     */
    public void pubsubPattern(JedisPubSub jedisPubSub,String topic){
        Jedis jedis = null;
        try {
            jedis = jedisPool.getResource();
            jedis.psubscribe(jedisPubSub,topic);
        } catch (Exception e) {
            throw e;
        } finally {
            if(jedis != null){
                jedis.close();
            }
        }
    }
}

以上就是reids的发布订阅功能,当然此篇和上一篇一样只是为了熟悉redis的一些功能,真正需要这种场景时我们完全可以使用mq的交换机同样可以实现,而且更专业,但是如果大家懒得维护mq那么这种情况也可以使用。

等redis系列写完后续也会陆续写出mq的系列文章,但是由于最近上班的开发任务比较众,所以能够抽出时间写文章的时间不多,更新可能比较慢,大家谅解。

相关标签: redis 发布订阅