延迟队列如何做
项目中,我们经常会有这样的需求: 比如订单生成之后30分钟不付款自动关闭订单,用户注册成功5分钟后,推送感兴趣话题等,都要用到延迟队列。延迟队列和定时任务有点像,但又有些不同。定时任务是周期性地执行任务,或者在确定的时间执行任务。而延时任务、延时队列在于并不知道任务执行的起点是多少,而只知道延迟相应的间隔就要执行。实现延迟队列的方法有好多种,这里主要介绍JDK延迟队列以及利用Redis zset方式实现两种方法:
1、jdk延迟队列
首先实体类需要实现delayed,自己实现过期时间方法,以当前时间戳为基准
@Data
public class Order implements Delayed {
/**
* 延迟时间
*/
@JsonFormat(locale = "zh", timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
private long time;
String name;
public Order(String name, long time, TimeUnit unit) {
this.name = name;
this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
}
@Override
public long getDelay(TimeUnit unit) {
return time - System.currentTimeMillis();
}
@Override
public int compareTo(Delayed o) {
Order order = (Order) o;
long diff = this.time - order.time;
if (diff <= 0){
return -1;
}else {
return 1;
}
}
}
同时我们向延迟队列内插入3条数据,然后监听这个队列的执行,直到满足开始定义的时间要求后就可以执行相应的操作
@Test
public void testJdkDelayQueue() throws Exception{
Order order1 = new Order("order1",5, TimeUnit.SECONDS);
Order order2 = new Order("order2",10, TimeUnit.SECONDS);
Order order3 = new Order("order3",15, TimeUnit.SECONDS);
DelayQueue<Order> queue = new DelayQueue();
queue.put(order1);
queue.put(order2);
queue.put(order3);
System.out.println("订单延迟队列开始的时间:"+ DateUtil.formatDate(new Date()));
// 轮询
while (queue.size() != 0){
/**
* 获取队列头部元素是否过期,如果过期移除的数据就是null了。
*/
Order task = queue.poll();
if (task != null){
System.out.println("延迟队列执行"+task.name+"===="+DateUtil.formatDate(new Date(task.getTime())));
}
Thread.sleep(1000);
}
}
results:
订单延迟队列开始的时间:2020-11-30 16:31:28
延迟队列执行order1====2020-11-30 16:31:33
延迟队列执行order2====2020-11-30 16:31:38
延迟队列执行order3====2020-11-30 16:31:43
发现,执行结果符合我们事先设定的延迟时间。
jdk自带的delayQueue处理能力以及效率都有限,实际上延迟队列也可以借助Redis的zset实施
2、Redis zset 做延迟队列
将需要延迟的任务插入到zset中,并根据分数进行排序,实际我们可以将当前时间戳+延迟间隔当作该值的score,比如某个订单需要10分钟后关闭,score就可以是当前时间戳+60*10,单位统一换算成秒。
最后可以每一秒轮询一次,发现分数小于当前时间戳时,就认为该订单已经过期就可以执行删除订单操作并把该value从zset中移除。
首先向关闭订单key中模拟插入一系列需要关闭的数据:
/**
* 模拟插入一些需要延迟关闭订单的任务,可以用(当前时间戳+需要延迟关闭的时间)充当该值的score
*/
public void addZsetData(){
long nowSecond = System.currentTimeMillis() / 1000;
redisTemplate.opsForZSet().add("order-close","id001",nowSecond + 30);
redisTemplate.opsForZSet().add("order-close","id002",nowSecond + 50);
redisTemplate.opsForZSet().add("order-close","id003",nowSecond + 60);
redisTemplate.opsForZSet().add("order-close","id004",nowSecond + 80);
redisTemplate.opsForZSet().add("order-close","id005",nowSecond + 90);
redisTemplate.opsForZSet().add("order-close","id006",nowSecond + 100);
redisTemplate.opsForZSet().add("order-close","id007",nowSecond + 120);
}
然后开始每1s轮询一次,如果对应value的score小于当前时间戳的值,说明其值(对应的订单任务)已过期,可执行对应的任务并删除!
/**
* 轮询redis 的zset 发现score过期,就可以关闭订单,并将该value从zset中移除
*/
public void pollingListener(){
int i = 1;
while (true) {
// 当前时间戳,单位 s
long nowStamp = System.currentTimeMillis() / 1000;
// 以当前时间为基准, 查询前10分钟到后1分钟之后,共计11分钟
Set<DefaultTypedTuple<String>> orderList = redisTemplate.opsForZSet()
.rangeByScoreWithScores("order-close", nowStamp - 600, nowStamp + 60);
if (orderList.size() > 0) {
// 使用轮序查询过期任务并处理
for (DefaultTypedTuple<String> tuple : orderList) {
String value = tuple.getValue();
double score = tuple.getScore();
long nowSecond = System.currentTimeMillis() / 1000;
/**
* 如果当前时间毫秒数大于其得分,说明该订单value已经过期,就可以关闭订单并移除相应的任务了
*/
if (nowSecond > score) {
System.out.println("移除过期的订单value===" + value + "==相应的分数是:" + score);
redisTemplate.opsForZSet().remove("order-close", value);
System.out.println("do,do,do,关闭订单任务模拟,实际可以考虑使用异步");
}
}
}
// 1s 轮询一次
try {
Thread.sleep(1000);
System.out.println("执行 "+(i++)+" 次==");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
results:
移除过期的订单value===id001==相应的分数是:1.606727961E9
do,do,do,关闭订单任务模拟,实际可以考虑使用异步
移除过期的订单value===id002==相应的分数是:1.606727981E9
do,do,do,关闭订单任务模拟,实际可以考虑使用异步
移除过期的订单value===id003==相应的分数是:1.606727991E9
do,do,do,关闭订单任务模拟,实际可以考虑使用异步
移除过期的订单value===id004==相应的分数是:1.606728011E9
do,do,do,关闭订单任务模拟,实际可以考虑使用异步
可以发现相应的任务都在一定的延迟时间后被删除了,达到了延迟队列的目的。这里使用zset 还是有一定的局限性,实际项目中也有结合时间轮设计延迟队列的,后续可进一步进行研究。。。
参考文档:
http://www.dockone.io/article/10139