欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

订单超时取消之延迟队列

程序员文章站 2022-04-11 10:21:52
...

什么是延迟队列?

我们先来看一个场景:以淘宝购物为例,当你提交订单之后有30分钟的支付时间,假如你30分钟之后还没有进行支付,订单就会被取消。现在让你来实现这个功能,你准备如何实现?

相信很多小伙伴第一反应就是定时轮询,设定一个定时任务去扫订单数据,一旦发现超过30分钟未支付的订单,就将订单状态update成已取消,这是一种最简单的方法,也是最容易实现的。这种方案的弊端在于:当数据量小时,不会存在问题,当数据量越来越大时,定时扫表会变得越来越慢,而且频繁的扫表会影响下单的效率

延迟队列就是用来解决这一类问题的,那么什么是延迟队列呢?

延迟队列是为了解决任务推迟执行的问题,消息进入延迟队列之后暂时不能被消费,等超过了设定的时间才能被消费者进行消费

可以想像一下这样一种场景,每个任务进入队列的时候都打上了一个时间标签,任务1(10分钟后执行)、任务2(30分钟后执行)、任务3(60分钟后执行),当到了标签对应的时间之后,任务才能被执行

常见的可以使用延迟队列场景:

  • 淘宝下单后,30分钟未支付要取消订单
  • 外卖订单1分钟后,短信提醒客户
  • 3天未评论自动好评

总之,需要延后执行的任务都可以用延迟队列来解决


延迟队列的实现方法

1、DelayQueue

在JDK的java.util.concurrent包中提供了延迟队列的实现DelayQueue,它提供了在指定的时间才能获取队列中元素的功能,队列头的元素是最接近过期时间的元素。如果没有过期元素,使用poll()方法会返回null。下面看代码实现

public class DelayTask implements Delayed {

    private String msg;
    private long executeTime;

    public String getMsg() {
        return msg;
    }

    public void setMsg(String msg) {
        this.msg = msg;
    }

    public long getExecuteTime() {
        return executeTime;
    }

    public void setExecuteTime(long executeTime) {
        this.executeTime = executeTime;
    }

    public DelayTask(long delayTime, String msg){
        //到期时间 = 当前时间 + 延迟时间
        this.executeTime = System.currentTimeMillis() + delayTime;
        this.msg = msg;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(this.executeTime - System.currentTimeMillis(),TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        return (int)(this.getDelay(TimeUnit.MILLISECONDS) - o.getDelay(TimeUnit.MILLISECONDS));
    }
}

DelayQueue中的元素需要实现Delayed接口,我们定义一个任务来实现Delayed接口,主要有两个方法

  • getDelay(TimeUnit unit) 定义了剩余到期时间
  • compareTo(Delayed o) 定义了元素的排序规则
public class DelayQueueTest {
    public static void main(String[] args){
        //创建一个延迟队列
        DelayQueue<DelayTask> delayQueue = new DelayQueue<DelayTask>();
        DelayQueueTest delayQueueTest = new DelayQueueTest();
        //生产者放入消息
        delayQueueTest.producer(delayQueue);
        //消费者消费消息
        delayQueueTest.consumer(delayQueue);

    }

    //定义一个消费者,启动一个线程,循环从队列中拿元素
    private void consumer(DelayQueue<DelayTask> delayTasks){
        new Thread(new Runnable() {
            @Override
            public void run() {
                while(true){
                    try {
                        DelayTask delayTask = delayTasks.take();
                        System.out.println("消息消费时间:" + getCurrentTime() + ",msg:" + delayTask.getMsg());
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                }
            }
        }).start();
    }

   //生产者,将消息带上延迟时间,放入延迟队列
    private void producer(DelayQueue<DelayTask> delayTasks){
        DelayTask delayTask = new DelayTask(5000,"delay msg");
        System.out.println("消息放入时间:" + getCurrentTime());
        delayTasks.add(delayTask);
    }

    public static String getCurrentTime(){
        Date d = new Date();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        return sdf.format(d);
    }
}

如上的代码

  • 生产者放入一个任务,延迟5秒执行
  • 消费者不停的轮询队列,从中拿任务来执行

从输出结果可以看出,任务过了5秒才被消费者拿到,实现了任务的延迟执行

消息放入时间:2020-06-04 00:06:17
消息消费时间:2020-06-04 00:06:22,msg:delay msg

rabbitMQ死信队列

rabbitMQ是用一种死信队列的方式来实现消息延迟消费的,就是消息先进入死信队列,此时的死信队列是没有消费者的,当到达过期时间后,消息会被转发到对应的业务队列中进行消费,从而达到任务延迟执行的效果
订单超时取消之延迟队列

  • 生产者发送消息给死信Exchange,通过routing-key消息发送到指定的死信队列,此时死信队列是没有消费者的
  • 死信队列中的消息到期后会自动转发到业务Exchange中,通过routing-key消息发送到指定的业务队列中
  • 业务处理的Consumer监控业务队列,取到转发过来的消息进行消费,从而达到延迟队列的效果

下面来看一下代码实现:

首先是rabbitmq的配置

spring:
  rabbitmq:
    host: localhost
    port: 5672
    username: admin
    password: admin

rabbitmq的Configuration

@Configuration
@ConfigurationProperties(prefix = "spring.rabbitmq")
public class RabbitMQConfig {
    private String host;
    private int port;
    private String username;
    private String password;


    public static String DELAY_EXCHANGE = "delay.exchange";//死信Exchange
    public static String BUSS_EXCHANGE = "buss.exchange";//业务Exchange
    public static String DELAY_QUEUE = "delay.queue";//死信队列
    public static String BUSS_QUEUE = "buss.queue";//业务队列

    /**
     * 定义一个死信队列
     * @return
     */
    @Bean
    public Queue delayQueue(){
        Map<String,Object> args = new HashMap<String,Object>();
        //消息过期后转发的exchange
        args.put("x-dead-letter-exchange",BUSS_EXCHANGE);
        //消息过期后转发的routing-key
        args.put("x-dead-letter-routing-key","delay.msg");
        //队列中消息的过期时间(注意消息上也可以设置过期时间),两者若同时设置取其小
        args.put("x-message-ttl",20000);
        return QueueBuilder.durable(DELAY_QUEUE).withArguments(args).build();
    }


    /**
     * 定义普通的业务队列
     * @return
     */
    @Bean
    public Queue bussQueue(){
        return new Queue(BUSS_QUEUE,true);
    }

    /**
     * 死信Exchange
     * @return
     */
    @Bean
    public TopicExchange delayTopicExchange(){
        return new TopicExchange(DELAY_EXCHANGE);
    }

    /**
     * 业务Exchange
     * @return
     */
    @Bean
    public TopicExchange bussTopicExchange(){
        return new TopicExchange(BUSS_EXCHANGE);
    }

    /**
     * 绑定死信队列与死信Exchange,设置routing-key为queue.delay
     * @return
     */
    @Bean
    public Binding bindingDelayExchangeMessage(){
        return BindingBuilder.bind(delayQueue()).to(delayTopicExchange()).with("queue.delay");
    }

    /**
     * 绑定业务队列与业务Exchange,设置routing-key为delay.msg
     * 注意:此处的rounting-key与死信队列的x-dead-letter-routing-key保持一致,才能保证死信消息过期后可以转发到此队列中
     * @return
     */
    @Bean
    public Binding bindingDelayMessage(){
        return BindingBuilder.bind(bussQueue()).to(bussTopicExchange()).with("delay.msg");
    }

    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(host,port);
        connectionFactory.setAddresses(host);
        connectionFactory.setUsername(username);
        connectionFactory.setPassword(password);
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }

    @Bean
    public RabbitTemplate rabbitTemplate(){
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        return rabbitTemplate;
    }

    public String getHost() {
        return host;
    }

    public void setHost(String host) {
        this.host = host;
    }

    public int getPort() {
        return port;
    }

    public void setPort(int port) {
        this.port = port;
    }

    public String getUsername() {
        return username;
    }

    public void setUsername(String username) {
        this.username = username;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }
}

定义一个消息实体

public class Message implements Serializable {
    private String key;
    private String value;

    public String getKey() {
        return key;
    }

    public void setKey(String key) {
        this.key = key;
    }

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }
}

定义一个Consumer来监控业务队列

@Component
@RabbitListener(queues = "buss.queue")
public class Consumer {

    @RabbitHandler
    public void consumerMessage(Message message){
        String key = message.getKey();
        String value = message.getValue();
        System.out.println("延迟队列消费时间" + getCurrentTime());
        System.out.println("消费的消息:" + message.getKey() + "---" + message.getValue());
    }

    public static String getCurrentTime(){
        Date d = new Date();
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        return sdf.format(d);
    }
}

最后写个Controller测试一下

@RestController
public class RabbitDemoTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RequestMapping("/test")
    public void send(){
        Message message = new Message();
        message.setKey("rabbit");
        message.setValue("Hello");
        System.out.println("消息发送时间:" + Consumer.getCurrentTime());
        rabbitTemplate.convertAndSend(RabbitMQConfig.DELAY_EXCHANGE, "queue.delay", message, new MessagePostProcessor() {
            @Override
            public org.springframework.amqp.core.Message postProcessMessage(org.springframework.amqp.core.Message message) throws AmqpException {
                message.getMessageProperties().setContentEncoding("UTF-8");
                message.getMessageProperties().setExpiration("20000");
                return message;
            }
        });
    }
}

启动类

@SpringBootApplication
public class RabbitmqDemo {
    public static void main(String[] args){
        SpringApplication.run(RabbitmqDemo.class);
    }
}

我们访问:http://localhost:8080/test

执行结果如下:

消息发送时间:2020-06-04 22:16:01
延迟队列消费时间2020-06-04 22:16:21
消费的消息:rabbit---Hello

从执行结果可以看出来,是按指定的时间实现了消息的延迟消费

其实延迟队列的实现方式有很多,像时间轮、radis、Quartz、SchedulerX(阿里)等等,都可以实现延迟队列的功能

这些实现方案都各有千秋,我们在实际的项目中要根据情况来选择合适的实现方案,一切的技术方案都是为了解决业务问题

不要为了技术而技术,脱离业务的技术设计是耍流氓!!!

关注公众号,回复“源码333”可免费下载Demo源码
订单超时取消之延迟队列

相关标签: 互联网