Redis - 消息发布订阅机制
Redis 发布订阅(pub/sub)是一种消息通信模式:发送者(pub)发送消息,订阅者(sub)接收消息。发布者生产消息放到队列里,多个监听队列的消费者都会收到同一份消息。
Redis客户端可以订阅任意数量的频道
。
订阅/发布消息图
下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:
当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:
相关命令
发布订阅命令 - 测试
1、开启一个客户端进行监听频道信息 - 接收端
127.0.0.1:6379> SUBSCRIBE lcy # 监听名为lcy的频道
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "lcy"
3) (integer) 1
2、开启另外一个客户端发布消息 - 发送端
127.0.0.1:6379> PUBLISH lcy "publish in lcy" # 在lcy频道发布"publish in lyc"消息
(integer) 1
3、查看第一个客户端接收到的消息 - 接收端
1) "message"
2) "lcy"
3) "publish in lcy"
原理
Redis是使用C实现的,通过分析 Redis 源码里的pubsub.c
文件,了解发布和订阅机制的底层实现,籍此加深对 Redis 的理解。
Redis 通过 PUBLISH 、SUBSCRIBE 和 PSUBSCRIBE 等命令实现发布和订阅功能。
通过SUBSCRIBE
命令订阅某频道后,redis-server
里维护了一个字典,字典的键就是一个个 频道
。, 而字典的值则是一个链表,链表中保存了所有订阅这个 channel 的客户端。SUBSCRIBE
命令的关键, 就是将客户端添加到给定 channel 的订阅链表中。
通过PUBLISH
命令向订阅者发送消息,redis-server
会使用给定的频道作为键,在它所维护的 channel 字典中查找记录了订阅这个频道的所有客户端的链表,遍历这个链表,将消息发布给所有订阅者。
Pub/Sub 从字面上理解就是发布(Publish)与订阅(Subscribe),在Redis中,你可以设定对某一个 key值进行消息发布及消息订阅,当一个key值上进行了消息发布后,所有订阅它的客户端都会收到相应 的消息。这一功能明显的用法就是用作实时消息系统,比如普通的即时聊天,群聊等功能。
使用场景
1、实时聊天系统
2、订阅、关注系统
其实更多时候都是使用MQ(消息中间件)来做的。
SpringBoot + SpringDataRedis实现发布/订阅
1、引入Redis的依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
2、配置文件
@Configuration
public class RedisSubListenerConfig {
@Bean
public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
// 这个container 可以添加多个 messageListener
// 订阅了一个叫lcy的频道(通道)
container.addMessageListener(listenerAdapter, new PatternTopic("lcy"));
// container.addMessageListener(listenerAdapter, new PatternTopic("jyqc"));
return container;
}
/**
* 利用反射来创建监听到消息之后的执行方法
*/
@Bean
public MessageListenerAdapter listenerAdapter(RedisConsumer redisReceiver) {
// 这个地方 是给messageListenerAdapter 传入一个消息接受的处理器,利用反射的方法调用“receiveMessage”
// 也有好几个重载方法,默认调用处理器的方法 叫handleMessage
return new MessageListenerAdapter(redisReceiver, "receiveMessage");
}
/**
* RedisTemplate模板
* @param factory Redis连接工厂
*/
@Bean
public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory factory){
RedisTemplate<String, Object> template = new RedisTemplate<>();
template.setConnectionFactory(factory);
// 使用Jackson进行序列化配置
Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
ObjectMapper om = new ObjectMapper();
om.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
// om.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
// 如果enableDefaultTyping过期(SpringBoot后续版本过期了),则使用下面这个代替
om.activateDefaultTyping(LaissezFaireSubTypeValidator.instance,ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.WRAPPER_ARRAY);
jackson2JsonRedisSerializer.setObjectMapper(om);
// String序列化
StringRedisSerializer stringRedisSerializer = new StringRedisSerializer();
// key采用String的序列方式
template.setKeySerializer(stringRedisSerializer);
// hash的key也采用String的序列方式
template.setHashKeySerializer(stringRedisSerializer);
// value序列化采用jackson
template.setValueSerializer(jackson2JsonRedisSerializer);
// hash的value序列化也采用jackson
template.setHashValueSerializer(jackson2JsonRedisSerializer);
template.afterPropertiesSet();
return template;
}
}
3、创建订阅者 - 消息处理类
@Service
public class RedisConsumer {
public void receiveMessage(Object message) {
System.out.println("收到消息:");
System.out.println(message);
}
}
4、创建发布者 - 消息发送类
@Service
public class RedisSender {
@Autowired
private RedisTemplate redisTemplate;
/**
* 向频道发送消息的方法
* @param channel 频道名
* @param message 消息内容
*/
public void sendChannelMess(String channel, Object message) {
redisTemplate.convertAndSend(channel, message);
}
}
5、测试类
@Autowired
private RedisSender redisSender;
@Test
public void test(){
User user = new User("柳成荫","男");
redisSender.sendChannelMess("lcy",user);
}
结果:
我没有使用过Redis的这个功能,一般都用MQ来完成这个功能。
下一篇: 机器学习实战之决策树基础笔记