欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

延迟队列

程序员文章站 2022-06-16 11:48:04
延迟队列延迟,也就是等待一定的时间在执行的。目前支持延迟的消息队列有 RabbitMQ,RocketMQ。但是RocketMQ支持的延迟时间并不灵活,延迟时间并不能自定义。在项目中,延迟使用的比较多的。例如订单成功后,在30分钟内没有支付,自动取消订单外卖平台发送订餐通知,下单成功后60s给用户推送短信。如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存源码:一、DelayQueue 延时队列定义延迟队列package com.classics.delay;im...

延迟队列

延迟,也就是等待一定的时间在执行的。目前支持延迟的消息队列有 RabbitMQRocketMQ。但是RocketMQ支持的延迟时间并不灵活,延迟时间并不能自定义。在项目中,延迟使用的比较多的。

  • 例如
    • 订单成功后,在30分钟内没有支付,自动取消订单
    • 外卖平台发送订餐通知,下单成功后60s给用户推送短信。
    • 如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存

源码:https://github.com/gl-stars/small-study-case/tree/master/nm-demo/classicsCase-demo/src/main/java/com/classics/delay

一、DelayQueue 延时队列

  • 定义延迟队列
package com.classics.delay;

import com.fasterxml.jackson.annotation.JsonFormat;

import java.util.concurrent.Delayed;
import java.util.concurrent.TimeUnit;

public class Order implements Delayed {
    /**
     * 延迟时间,单位由 {@link Order#Order(java.lang.String, long, java.util.concurrent.TimeUnit)}定义
     */
    @JsonFormat(locale = "zh", timezone = "GMT+8", pattern = "yyyy-MM-dd HH:mm:ss")
    private long time;
    String name;

    /***
     * 添加延迟任务
     * @param name 延迟内容
     * @param time 延迟时间
     * @param unit 单位
     */
    public Order(String name, long time, TimeUnit unit) {
        this.name = name;
        this.time = System.currentTimeMillis() + (time > 0 ? unit.toMillis(time) : 0);
    }
    
    @Override
    public long getDelay(TimeUnit unit) {
        return time - System.currentTimeMillis();
    }
    @Override
    public int compareTo(Delayed o) {
        Order Order = (Order) o;
        long diff = this.time - Order.time;
        if (diff <= 0) {
            return -1;
        } else {
            return 1;
        }
    }
}
  • 测试
package com.classics.delay;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.concurrent.DelayQueue;
import java.util.concurrent.TimeUnit;

public class DelayQueueDemo {

    public static void main(String[] args) throws InterruptedException {
        // 添加三个定时任务,TimeUnit.SECONDS 表示单位为秒
        Order Order1 = new Order("Order1", 5, TimeUnit.SECONDS);
        Order Order2 = new Order("Order2", 10, TimeUnit.SECONDS);
        Order Order3 = new Order("Order3", 15, TimeUnit.SECONDS);
        DelayQueue<Order> delayQueue = new DelayQueue<>();
        delayQueue.put(Order1);
        delayQueue.put(Order2);
        delayQueue.put(Order3);

        /**
         * 这里很重要,这里不断的循环,不断的监测是否有到期的队列
         */
        while (delayQueue.size() != 0) {
            /**
             * 取队列头部元素是否过期
             */
            Order task = delayQueue.poll();
            if (task != null) {
                System.out.format("订单:{%s}被取消, 取消时间:{%s}\n", task.name, LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
            }
            Thread.sleep(1000);
        }
    }
}

延迟队列

二、Redis实现延迟

主要是过期时间回退的方式,也就是我们添加一个消息,时间到期了调用一下程序,正好可以实现延迟的效果。

docker安装Redis参考:https://blog.csdn.net/qq_41853447/article/details/112003274

docker安装Redis参考:https://blog.csdn.net/qq_41853447/article/details/103201684

  • 配置 redis.conf

redis.conf配置文件中添加 notify-keyspace-events Ex配置。

notify-keyspace-events Ex

延迟队列

  • 引入依赖
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

我使用的是springboot工程,这个依赖springboot版本控制器里面已经定义好了,所以这里不需要添加版本号。

  • 定义配置类
package com.classics.delay;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;

@Configuration
public class RedisListenerConfig {

    @Bean
    RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory) {

        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        return container;
    }
}
package com.classics.delay;

import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.listener.KeyExpirationEventMessageListener;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.stereotype.Component;

@Component
public class RedisKeyExpirationListener extends KeyExpirationEventMessageListener {
 
    public RedisKeyExpirationListener(RedisMessageListenerContainer listenerContainer) {
        super(listenerContainer);
    }


    /**
     * 通过Redis处理接收到的对象的回调
     * @param message key值
     * @param pattern
     */
    @Override
    public void onMessage(Message message, byte[] pattern) {
        String expiredKey = message.toString();
        System.out.println("监听到key:" + expiredKey + "已过期");
    }
}
  • 测试

使用 Another Redis Desktop Manage工具连接Redis

Another Redis Desktop Manage使用参考:https://blog.csdn.net/qq_41853447/article/details/112003227

延迟队列

延迟队列

  • 语法
set key值 数据 ex 过期时间
set key_admin 这是数据 ex 5

延迟队列

注意:如果Redis过期回调时出现了网络问题或者其他问题没有调用成功的,Redis是不会再次调用的。

这样实现延迟队列以后,就没有必要在写一个定时任务,10分钟或者5分钟去检索一下数据库,有没有该处理的数据。那样我觉得不是访问量压垮服务器,就凭这些定时任务对服务器都是一个不可忽略的负担。有了延迟任务之后,预防延迟队列数据的丢失或者其他情况,定时任务可以有,但是这个定时的时间可以选择大一些。

本文地址:https://blog.csdn.net/qq_41853447/article/details/112003817

相关标签: 其他