Redis的发布订阅与keyspaces-notification实现定时通知
程序员文章站
2022-04-15 17:47:21
Redis的发布订阅与MQ消息队列是一样的,客户端需要首先订阅消息,此事件是阻塞的,发布者发布消息后,订阅了此消息的所有订阅者都会收到进行消费。1.自定义发布订阅#同时订阅了三个频道,一旦其中一个接收到消息就会进行数据的展示subscribe java php c++#发布消息publish java springboot#订阅所有通配的频道psubscribe java* php*2.内部事件发布订阅2.1 业务场景分析业务场景举例:订单超时未支付,实现自动关闭,如何实现?使用...
Redis的发布订阅与MQ消息队列是一样的,客户端需要首先订阅消息,此事件是阻塞的,发布者发布消息后,订阅了此消息的所有订阅者都会收到进行消费。
1.自定义发布订阅
#同时订阅了三个频道,一旦其中一个接收到消息就会进行数据的展示
subscribe java php c++
#发布消息
publish java springboot
#订阅所有通配的频道
psubscribe java* php*
2.内部事件发布订阅
2.1 业务场景分析
业务场景举例:
订单超时未支付,实现自动关闭,如何实现?
- 使用Quartz任务调度框架,定时调度方式来检查,如果使用5分钟为单位轮询一次,这样的话假如订单60分钟超时,那么我们可能到(60+5)分钟-1秒才检测到。Quartz更适合周期性的任务,所以在这里显然不适用。
- Timer Java原生的定时器,以秒为单位进行检测,可以少量使用,如果数据量大,性能是瓶颈。
- Quartz+Timer形式来作:5分钟做一次大循环,在5分钟内查出超时的订单,然后创建多线程通过timer完成订单定时检查,实现比较复杂。
所以如果能够主动的通知的形式来做过期提醒就好了。
恰好redis给我们提供了这样的一个监听机制,当键过期了给我们一个通知。Redis 2.8.X版本后,推出了Keyspace Notifications特性,类似数据库的trigger触发器。
keyspace notifications是基于sub/pub发布订阅机制的,可以接收对数据库中影响key操作的所有事件:比如del、set、expire(过期事件)。。。
2.2 接收的事件类型
有两种:keyspace,keyevent
keyspace: 是key触发的事件的具体操作
keyevent: 是事件影响的键名
#pub这个动作是系统自动发布的
127.0.0.1:6379>del mykey
#数据库0会发布以下两个信息,__这是两个英文状态下的下划线
publish __keyspace@0__:mykey del
publish __keyevent@0__:del mykey
2.3 开启系统通知
redis.conf配置文件里
# 这都是配置项
# K Keyspace events, published with __keyspace@<db>__ prefix.
# E Keyevent events, published with __keyevent@<db>__ prefix.
# g Generic commands (non-type specific) like DEL, EXPIRE, RENAME, ...
# $ String commands
# l List commands
# s Set commands
# h Hash commands
# z Sorted set commands
# x Expired events (events generated every time a key expires)
# e Evicted events (events generated when a key is evicted for maxmemory)
# A Alias for g$lshzxe, so that the "AKE" string means all the events.
开启事件通知
#开关在redis.conf配置里
notify-keyspace-events "" #默认是空字符串,是关闭状态
notify-keyspace-events "KEx" #配置文件里只配置了space和event的expried事件,就只自动发布这个事件。
2.4 订阅key的编写
通知事件的发生是由Redis自动触发的,不需要手动pub,只需要进行sub订阅即可
订阅语法
#主动订阅key的过期通知
subscribe __keyspace@0__:keyname #具体的key对应的影响
subscribe __keyevent@0__:expired #具体的事件影响的key
#如果我们有很多数据库,可以通配订阅
psubscribe __key*@*__:*
3 springboot集成
POM依赖
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
</dependencies>
yaml配置
spring:
redis:
host: 127.0.0.1
port: 6379
第一种方式:
service
@Service
public class MessageReceiver {
//接收消息的方法
public void receiveMessage(String message){
//message接收到的过期key
System.out.println("Redis 监听:"+message);
}
}
配置类configuration
@Configuration
public class RedisMessageConfig {
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter){
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//订阅触发的通道
container.addMessageListener(listenerAdapter,
new PatternTopic("__keyevent@0__:expired"));
return container;
}
@Bean
MessageListenerAdapter listenerAdapter(MessageReceiver receiver){
return new MessageListenerAdapter(receiver,"receiveMessage");
}
}
第二种方式:继承MessageListener
service
继承KeyspaceEventMessageListener可以订阅到所有的事件,set,del,expire
@Service
public class RedisNormalCmd extends KeyspaceEventMessageListener {
public RedisNormalCmd(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
@Override
protected void doHandleMessage(Message message) {
System.out.println(message);
}
}
继承KeyExpirationEventMessageListener只能订阅到键过期的事件
@Service
public class RedisTask extends KeyExpirationEventMessageListener {
private Logger logger = LoggerFactory.getLogger(RedisTask.class);
public RedisTask(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
@Override
public void onMessage(Message message, byte[] pattern) {
String channel = new String(message.getChannel(), StandardCharsets.UTF_8);
//过期的key
String key = new String(message.getBody(),StandardCharsets.UTF_8);
logger.info("redis key 过期:pattern={},channel={},key={}",new String(pattern),channel,key);
}
}
配置类configuration
@Configuration
public class RedisTimerConfiguration {
private Logger logger = LoggerFactory.getLogger(RedisTimerConfiguration.class);
@Autowired
private RedisConnectionFactory redisConnectionFactory;
@Bean
public RedisMessageListenerContainer redisMessageListenerContainer() {
RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
redisMessageListenerContainer.setConnectionFactory(redisConnectionFactory);
return redisMessageListenerContainer;
}
}
本文地址:https://blog.csdn.net/Hi_alan/article/details/109531036