延迟队列
延迟队列
延迟,也就是等待一定的时间在执行的。目前支持延迟的消息队列有 RabbitMQ
,RocketMQ
。但是RocketMQ
支持的延迟时间并不灵活,延迟时间并不能自定义。在项目中,延迟使用的比较多的。
- 例如
- 订单成功后,在30分钟内没有支付,自动取消订单
- 外卖平台发送订餐通知,下单成功后60s给用户推送短信。
- 如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存
一、DelayQueue 延时队列
- 定义延迟队列
package com.classics.delay;
import com.fasterxml.jackson.annotation.JsonFormat;
import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;
public class Order implements Delayed {
/**
* 延迟时间,单位由 {@link Order#Order(java.lang.String, long, java.util.concurrent.TimeUnit)}定义
*/
@JsonFormat(locale = "zh", timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
private long time;
String name;
/***
* 添加延迟任务
* @param name 延迟内容
* @param time 延迟时间
* @param unit 单位
*/
public Order(String name, long time, TimeUnit unit) {
this.name = name;
this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
}
@Override
public long getDelay(TimeUnit unit) {
return time - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
Order Order = (Order) o;
long diff = this.time - Order.time;
if (diff <= 0) {
return -1;
} else {
return 1;
}
}
}
- 测试
package com.classics.delay;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;
public class DelayQueueDemo {
public static void main(String[] args) throws InterruptedException {
// 添加三个定时任务,TimeUnit.SECONDS 表示单位为秒
Order Order1 = new Order("Order1", 5, TimeUnit.SECONDS);
Order Order2 = new Order("Order2", 10, TimeUnit.SECONDS);
Order Order3 = new Order("Order3", 15, TimeUnit.SECONDS);
DelayQueue<Order> delayQueue = new DelayQueue<>();
delayQueue.put(Order1);
delayQueue.put(Order2);
delayQueue.put(Order3);
/**
* 这里很重要,这里不断的循环,不断的监测是否有到期的队列
*/
while (delayQueue.size() != 0) {
/**
* 取队列头部元素是否过期
*/
Order task = delayQueue.poll();
if (task != null) {
System.out.format("订单:{%s}被取消, 取消时间:{%s}\n", task.name, LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
}
Thread.sleep(1000);
}
}
}
二、Redis实现延迟
主要是过期时间回退的方式,也就是我们添加一个消息,时间到期了调用一下程序,正好可以实现延迟的效果。
docker
安装Redis
参考:https://blog.csdn.net/qq_41853447/article/details/112003274
非docker
安装Redis
参考:https://blog.csdn.net/qq_41853447/article/details/103201684
- 配置
redis.conf
在redis.conf
配置文件中添加 notify-keyspace-events Ex
配置。
notify-keyspace-events Ex
- 引入依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
我使用的是springboot
工程,这个依赖springboot
版本控制器里面已经定义好了,所以这里不需要添加版本号。
- 定义配置类
package com.classics.delay;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
@Configuration
public class RedisListenerConfig {
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
}
package com.classics.delay;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;
@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
super(listenerContainer);
}
/**
* 通过Redis处理接收到的对象的回调
* @param message key值
* @param pattern
*/
@Override
public void onMessage(Message message, byte[] pattern) {
String expiredKey = message.toString();
System.out.println("监听到key:" + expiredKey + "已过期");
}
}
- 测试
使用 Another Redis Desktop Manage
工具连接Redis
。
Another Redis Desktop Manage
使用参考:https://blog.csdn.net/qq_41853447/article/details/112003227
- 语法
set key值 数据 ex 过期时间
set key_admin 这是数据 ex 5
注意:如果Redis过期回调时出现了网络问题或者其他问题没有调用成功的,Redis是不会再次调用的。
这样实现延迟队列以后,就没有必要在写一个定时任务,10分钟或者5分钟去检索一下数据库,有没有该处理的数据。那样我觉得不是访问量压垮服务器,就凭这些定时任务对服务器都是一个不可忽略的负担。有了延迟任务之后,预防延迟队列数据的丢失或者其他情况,定时任务可以有,但是这个定时的时间可以选择大一些。
本文地址:https://blog.csdn.net/qq_41853447/article/details/112003817