欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Redis - 消息发布订阅机制

程序员文章站 2022-03-30 16:33:27
...

Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。发布者生产消息放到队列里,多个监听队列的消费者都会收到同一份消息。
Redis客户端可以订阅任意数量的频道

订阅/发布消息图

Redis - 消息发布订阅机制
下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:
Redis - 消息发布订阅机制
当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:
Redis - 消息发布订阅机制

相关命令

Redis - 消息发布订阅机制

发布订阅命令 - 测试

1、开启一个客户端进行监听频道信息 - 接收端

127.0.0.1:6379> SUBSCRIBE lcy          # 监听名为lcy的频道
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "lcy"
3) (integer) 1

2、开启另外一个客户端发布消息 - 发送端

127.0.0.1:6379> PUBLISH lcy "publish in lcy"   # 在lcy频道发布"publish in lyc"消息
(integer) 1

3、查看第一个客户端接收到的消息 - 接收端

1) "message"
2) "lcy"
3) "publish in lcy"

原理

Redis是使用C实现的,通过分析 Redis 源码里的pubsub.c文件,了解发布和订阅机制的底层实现,籍此加深对 Redis 的理解。
Redis 通过 PUBLISH 、SUBSCRIBE 和 PSUBSCRIBE 等命令实现发布和订阅功能。

通过SUBSCRIBE命令订阅某频道后,redis-server 里维护了一个字典,字典的键就是一个个 频道。, 而字典的值则是一个链表,链表中保存了所有订阅这个 channel 的客户端。SUBSCRIBE 命令的关键, 就是将客户端添加到给定 channel 的订阅链表中。

通过PUBLISH命令向订阅者发送消息,redis-server会使用给定的频道作为键,在它所维护的 channel 字典中查找记录了订阅这个频道的所有客户端的链表,遍历这个链表,将消息发布给所有订阅者。

Pub/Sub 从字面上理解就是发布(Publish)与订阅(Subscribe),在Redis中,你可以设定对某一个 key值进行消息发布及消息订阅,当一个key值上进行了消息发布后,所有订阅它的客户端都会收到相应 的消息。这一功能明显的用法就是用作实时消息系统,比如普通的即时聊天,群聊等功能。

使用场景

1、实时聊天系统
2、订阅、关注系统
其实更多时候都是使用MQ(消息中间件)来做的。

SpringBoot + SpringDataRedis实现发布/订阅

1、引入Redis的依赖

 <dependency>
     <groupId>org.springframework.boot</groupId>
     <artifactId>spring-boot-starter-data-redis</artifactId>
 </dependency>

2、配置文件

@Configuration
public class RedisSubListenerConfig {
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
                                            MessageListenerAdapter listenerAdapter) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        // 这个container 可以添加多个 messageListener
        // 订阅了一个叫lcy的频道(通道)
        container.addMessageListener(listenerAdapter, new PatternTopic("lcy"));
//        container.addMessageListener(listenerAdapter, new PatternTopic("jyqc"));
        return container;
    }

    /**
     * 利用反射来创建监听到消息之后的执行方法
     */
    @Bean
    public MessageListenerAdapter listenerAdapter(RedisConsumer redisReceiver) {
        // 这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”
        // 也有好几个重载方法,默认调用处理器的方法 叫handleMessage
        return new MessageListenerAdapter(redisReceiver, "receiveMessage");
    }

    /**
     * RedisTemplate模板
     * @param factory Redis连接工厂
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory){
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(factory);
        // 使用Jackson进行序列化配置
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper om = new ObjectMapper();
        om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
//        om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        // 如果enableDefaultTyping过期(SpringBoot后续版本过期了),则使用下面这个代替
        om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance,ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.WRAPPER_ARRAY);
        jackson2JsonRedisSerializer.setObjectMapper(om);
        // String序列化
        StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
        // key采用String的序列方式
        template.setKeySerializer(stringRedisSerializer);
        // hash的key也采用String的序列方式
        template.setHashKeySerializer(stringRedisSerializer);
        // value序列化采用jackson
        template.setValueSerializer(jackson2JsonRedisSerializer);
        // hash的value序列化也采用jackson
        template.setHashValueSerializer(jackson2JsonRedisSerializer);
        template.afterPropertiesSet();
        return template;
    }
}

3、创建订阅者 - 消息处理类

@Service
public class RedisConsumer {

    public void receiveMessage(Object message) {
        System.out.println("收到消息:");
        System.out.println(message);
    }
}

4、创建发布者 - 消息发送类

@Service
public class RedisSender {
    @Autowired
    private RedisTemplate redisTemplate;

    /**
     * 向频道发送消息的方法
     * @param channel 频道名
     * @param message 消息内容
     */
    public void sendChannelMess(String channel, Object message) {
        redisTemplate.convertAndSend(channel, message);
    }
}

5、测试类

@Autowired
private RedisSender redisSender;

@Test
public void test(){
    User user = new User("柳成荫","男");
    redisSender.sendChannelMess("lcy",user);
}

结果:
Redis - 消息发布订阅机制
我没有使用过Redis的这个功能,一般都用MQ来完成这个功能。