欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

java如何实现通用的延迟位数?

程序员文章站 2022-06-04 08:29:39
...

电商大家都用过吧,下单后若未支付,通常都有一段支付倒计时,大约15分钟,若时间到了之后,无需支付的,订单将被关闭,库存将被释放。

这种业务就需要需要用到到延迟径向的功能,将任务丢到递减初始值,设置一个延迟时间,变量函数,到了时间之后,将逐步替换为指定的函数消费指定的任务。

下面的代码是一个通用的逐步降低的实现,大家可以直接拿去用。

代码还是比较简单的,技术要点:

 

  • 调用addTask方法将任务丢失到延迟本身中,主要参数(延迟时间,任务信息,替换【任务终止后会进行替换】)
  • 使用到了java中的延迟迭代DelayQueue来放置延迟任务
  • 下面的结构方法会自动调用一个start方法,start方法中会自动启动一个线程,线程从从串行中拉取终止的任务,然后丢到线程池executorService.submit中进行处理,会自动调用创建连续任务中指定的临时函数
  • 主要方法中有使用步骤
    import java.util.concurrent.*;
    import java.util.function.Consumer;
    import java.util.logging.Logger;
    
    public class DelayQueueService<T> {
        Logger logger = Logger.getLogger(DelayQueueService.class.getName());
        //延迟队列名称
        private String delayQueueName;
        private DelayQueue delayQueue = new DelayQueue<>();
        //处理队列中任务的线程池
        private ExecutorService executorService;
    
        public DelayQueueService(String delayQueueName) {
            this(delayQueueName, Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 4));
        }
    
        public DelayQueueService(String delayQueueName, ExecutorService executorService) {
            this.delayQueueName = delayQueueName;
            this.executorService = executorService;
            //启动队列消费
            this.start();
        }
    
        /**
         * 添加任务
         *
         * @param delayedTimeUnit 延迟时间单位
         * @param delayedTime     延迟时间
         * @param task            任务
         * @param consumer        任务消费者(到期了会回调)
         */
        public void addTask(TimeUnit delayedTimeUnit, long delayedTime, T task, Consumer consumer) {
            this.delayQueue.offer(new DelayedTask(delayedTimeUnit, delayedTime, task, consumer));
        }
    
        private void start() {
            //轮询从延迟队列中拉取任务,然后调用线程池进行处理
            Thread pollThread = new Thread(() -> {
                while (true) {
                    try {
                        DelayedTask delayedTask = this.delayQueue.poll(100, TimeUnit.MILLISECONDS);
                        if (this.executorService.isShutdown()) {
                            break;
                        }
                        if (delayedTask != null) {
                            executorService.submit(() -> {
                                delayedTask.consumer.accept(delayedTask.task);
                            });
                        }
                    } catch (InterruptedException e) {
                        logger.warning(e.getMessage());
                    }
                }
            });
            pollThread.setDaemon(Thread.currentThread().isDaemon());
            pollThread.setName(this.getClass().getName() + "-pollThread-" + this.delayQueueName);
            pollThread.start();
        }
    
        public void close() {
            if (!this.executorService.isShutdown()) {
                this.executorService.shutdown();
            }
        }
    
        public class DelayedTask implements Delayed {
            //延迟时间单位
            private TimeUnit delayedTimeUnit;
            //延迟时间
            private long delayedTime;
            //到期时间(毫秒)
            private long endTime;
            //延迟任务信息
            private T task;
            //消费者
            private Consumer consumer;
    
            public DelayedTask(TimeUnit delayedTimeUnit, long delayedTime, T task, Consumer consumer) {
                this.delayedTimeUnit = delayedTimeUnit;
                this.delayedTime = delayedTime;
                this.task = task;
                this.endTime = System.currentTimeMillis() + delayedTimeUnit.toMillis(delayedTime);
                this.consumer = consumer;
            }
    
            @Override
            public long getDelay(TimeUnit unit) {
                return unit.convert(this.endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS);
            }
    
            @Override
            public int compareTo(Delayed o) {
                DelayedTask task = (DelayedTask) o;
                return Long.compare(this.endTime, task.endTime);
            }
        }
    
        public static void main(String[] args) {
            //创建一个延迟队列:用来对超过支付日期的订单进行关闭
            String delayQueueName = "orderCloseDelayQueue";
            //1、创建延迟队列
            DelayQueueService orderCloseDelayQueue = new DelayQueueService(delayQueueName);
            for (int i = 1; i <= 50; i++) {
                //2、调用addTask将延迟任务加入延迟队列
                orderCloseDelayQueue.addTask(TimeUnit.MILLISECONDS, i, "订单" + i, new Consumer() {
                    @Override
                    public void accept(String s) {
                        System.out.println(System.currentTimeMillis() + "," + Thread.currentThread() + ",关闭订单:" + s);
                    }
                });
            }
            //3、系统关闭的时候,调用延迟队列的close方法
            //orderCloseDelayQueue.close();
        }
    }