欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

延迟队列如何做

程序员文章站 2024-03-18 11:00:22
...

项目中,我们经常会有这样的需求: 比如订单生成之后30分钟不付款自动关闭订单,用户注册成功5分钟后,推送感兴趣话题等,都要用到延迟队列。延迟队列和定时任务有点像,但又有些不同。定时任务是周期性地执行任务,或者在确定的时间执行任务。而延时任务、延时队列在于并不知道任务执行的起点是多少,而只知道延迟相应的间隔就要执行。实现延迟队列的方法有好多种,这里主要介绍JDK延迟队列以及利用Redis zset方式实现两种方法:

1、jdk延迟队列

    首先实体类需要实现delayed,自己实现过期时间方法,以当前时间戳为基准

@Data
public class Order implements Delayed {
    /**
     * 延迟时间
     */
    @JsonFormat(locale = "zh", timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
    private long time;

    String name;

    public Order(String name, long time, TimeUnit unit) {
        this.name = name;
        this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return time - System.currentTimeMillis();
    }

    @Override
    public int compareTo(Delayed o) {
        Order order = (Order) o;
        long diff = this.time - order.time;
        if (diff <= 0){
            return -1;
        }else {
            return 1;
        }
    }
}

同时我们向延迟队列内插入3条数据,然后监听这个队列的执行,直到满足开始定义的时间要求后就可以执行相应的操作

 @Test
    public void testJdkDelayQueue() throws Exception{
        Order order1 = new Order("order1",5, TimeUnit.SECONDS);
        Order order2 = new Order("order2",10, TimeUnit.SECONDS);
        Order order3 = new Order("order3",15, TimeUnit.SECONDS);

        DelayQueue<Order> queue = new DelayQueue();
        queue.put(order1);
        queue.put(order2);
        queue.put(order3);

        System.out.println("订单延迟队列开始的时间:"+ DateUtil.formatDate(new Date()));
 //     轮询
        while (queue.size() != 0){
            /**
             * 获取队列头部元素是否过期,如果过期移除的数据就是null了。
             */
            Order task = queue.poll();
            if (task != null){
                System.out.println("延迟队列执行"+task.name+"===="+DateUtil.formatDate(new Date(task.getTime())));
            }
            Thread.sleep(1000);
        }

    }
results:
订单延迟队列开始的时间:2020-11-30 16:31:28
延迟队列执行order1====2020-11-30 16:31:33
延迟队列执行order2====2020-11-30 16:31:38
延迟队列执行order3====2020-11-30 16:31:43

发现,执行结果符合我们事先设定的延迟时间。

jdk自带的delayQueue处理能力以及效率都有限,实际上延迟队列也可以借助Redis的zset实施

2、Redis zset 做延迟队列

将需要延迟的任务插入到zset中,并根据分数进行排序,实际我们可以将当前时间戳+延迟间隔当作该值的score,比如某个订单需要10分钟后关闭,score就可以是当前时间戳+60*10,单位统一换算成秒。

最后可以每一秒轮询一次,发现分数小于当前时间戳时,就认为该订单已经过期就可以执行删除订单操作并把该value从zset中移除。

首先向关闭订单key中模拟插入一系列需要关闭的数据:

  /**
     * 模拟插入一些需要延迟关闭订单的任务,可以用(当前时间戳+需要延迟关闭的时间)充当该值的score
     */
    public void addZsetData(){
        long  nowSecond = System.currentTimeMillis() / 1000;
        redisTemplate.opsForZSet().add("order-close","id001",nowSecond + 30);
        redisTemplate.opsForZSet().add("order-close","id002",nowSecond + 50);
        redisTemplate.opsForZSet().add("order-close","id003",nowSecond + 60);
        redisTemplate.opsForZSet().add("order-close","id004",nowSecond + 80);
        redisTemplate.opsForZSet().add("order-close","id005",nowSecond + 90);
        redisTemplate.opsForZSet().add("order-close","id006",nowSecond + 100);
        redisTemplate.opsForZSet().add("order-close","id007",nowSecond + 120);
    }

然后开始每1s轮询一次,如果对应value的score小于当前时间戳的值,说明其值(对应的订单任务)已过期,可执行对应的任务并删除!

/**
     * 轮询redis 的zset 发现score过期,就可以关闭订单,并将该value从zset中移除
     */
    public void pollingListener(){
        int i = 1;
        while (true) {
//            当前时间戳,单位 s
            long nowStamp = System.currentTimeMillis() / 1000;
//          以当前时间为基准, 查询前10分钟到后1分钟之后,共计11分钟
            Set<DefaultTypedTuple<String>> orderList = redisTemplate.opsForZSet()
                    .rangeByScoreWithScores("order-close", nowStamp - 600, nowStamp + 60);

            if (orderList.size() > 0) {
//           使用轮序查询过期任务并处理
            for (DefaultTypedTuple<String> tuple : orderList) {
                String value = tuple.getValue();
                double score = tuple.getScore();
                long nowSecond = System.currentTimeMillis() / 1000;
                /**
                 * 如果当前时间毫秒数大于其得分,说明该订单value已经过期,就可以关闭订单并移除相应的任务了
                 */
                if (nowSecond > score) {
                    System.out.println("移除过期的订单value===" + value + "==相应的分数是:" + score);
                    redisTemplate.opsForZSet().remove("order-close", value);
                    System.out.println("do,do,do,关闭订单任务模拟,实际可以考虑使用异步");
                }
            }
        }
//            1s 轮询一次
            try {
                Thread.sleep(1000);
                System.out.println("执行 "+(i++)+" 次==");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
results:
移除过期的订单value===id001==相应的分数是:1.606727961E9
do,do,do,关闭订单任务模拟,实际可以考虑使用异步
移除过期的订单value===id002==相应的分数是:1.606727981E9
do,do,do,关闭订单任务模拟,实际可以考虑使用异步
移除过期的订单value===id003==相应的分数是:1.606727991E9
do,do,do,关闭订单任务模拟,实际可以考虑使用异步
移除过期的订单value===id004==相应的分数是:1.606728011E9
do,do,do,关闭订单任务模拟,实际可以考虑使用异步

可以发现相应的任务都在一定的延迟时间后被删除了,达到了延迟队列的目的。这里使用zset 还是有一定的局限性,实际项目中也有结合时间轮设计延迟队列的,后续可进一步进行研究。。。

参考文档:

http://www.dockone.io/article/10139