java如何实现通用的延迟位数?
程序员文章站
2022-06-04 08:29:39
...
电商大家都用过吧,下单后若未支付,通常都有一段支付倒计时,大约15分钟,若时间到了之后,无需支付的,订单将被关闭,库存将被释放。
这种业务就需要需要用到到延迟径向的功能,将任务丢到递减初始值,设置一个延迟时间,变量函数,到了时间之后,将逐步替换为指定的函数消费指定的任务。
下面的代码是一个通用的逐步降低的实现,大家可以直接拿去用。
代码还是比较简单的,技术要点:
- 调用addTask方法将任务丢失到延迟本身中,主要参数(延迟时间,任务信息,替换【任务终止后会进行替换】)
- 使用到了java中的延迟迭代DelayQueue来放置延迟任务
- 下面的结构方法会自动调用一个start方法,start方法中会自动启动一个线程,线程从从串行中拉取终止的任务,然后丢到线程池executorService.submit中进行处理,会自动调用创建连续任务中指定的临时函数
-
主要方法中有使用步骤
import java.util.concurrent.*; import java.util.function.Consumer; import java.util.logging.Logger; public class DelayQueueService<T> { Logger logger = Logger.getLogger(DelayQueueService.class.getName()); //延迟队列名称 private String delayQueueName; private DelayQueue delayQueue = new DelayQueue<>(); //处理队列中任务的线程池 private ExecutorService executorService; public DelayQueueService(String delayQueueName) { this(delayQueueName, Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 4)); } public DelayQueueService(String delayQueueName, ExecutorService executorService) { this.delayQueueName = delayQueueName; this.executorService = executorService; //启动队列消费 this.start(); } /** * 添加任务 * * @param delayedTimeUnit 延迟时间单位 * @param delayedTime 延迟时间 * @param task 任务 * @param consumer 任务消费者(到期了会回调) */ public void addTask(TimeUnit delayedTimeUnit, long delayedTime, T task, Consumer consumer) { this.delayQueue.offer(new DelayedTask(delayedTimeUnit, delayedTime, task, consumer)); } private void start() { //轮询从延迟队列中拉取任务,然后调用线程池进行处理 Thread pollThread = new Thread(() -> { while (true) { try { DelayedTask delayedTask = this.delayQueue.poll(100, TimeUnit.MILLISECONDS); if (this.executorService.isShutdown()) { break; } if (delayedTask != null) { executorService.submit(() -> { delayedTask.consumer.accept(delayedTask.task); }); } } catch (InterruptedException e) { logger.warning(e.getMessage()); } } }); pollThread.setDaemon(Thread.currentThread().isDaemon()); pollThread.setName(this.getClass().getName() + "-pollThread-" + this.delayQueueName); pollThread.start(); } public void close() { if (!this.executorService.isShutdown()) { this.executorService.shutdown(); } } public class DelayedTask implements Delayed { //延迟时间单位 private TimeUnit delayedTimeUnit; //延迟时间 private long delayedTime; //到期时间(毫秒) private long endTime; //延迟任务信息 private T task; //消费者 private Consumer consumer; public DelayedTask(TimeUnit delayedTimeUnit, long delayedTime, T task, Consumer consumer) { this.delayedTimeUnit = delayedTimeUnit; this.delayedTime = delayedTime; this.task = task; this.endTime = System.currentTimeMillis() + delayedTimeUnit.toMillis(delayedTime); this.consumer = consumer; } @Override public long getDelay(TimeUnit unit) { return unit.convert(this.endTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { DelayedTask task = (DelayedTask) o; return Long.compare(this.endTime, task.endTime); } } public static void main(String[] args) { //创建一个延迟队列:用来对超过支付日期的订单进行关闭 String delayQueueName = "orderCloseDelayQueue"; //1、创建延迟队列 DelayQueueService orderCloseDelayQueue = new DelayQueueService(delayQueueName); for (int i = 1; i <= 50; i++) { //2、调用addTask将延迟任务加入延迟队列 orderCloseDelayQueue.addTask(TimeUnit.MILLISECONDS, i, "订单" + i, new Consumer() { @Override public void accept(String s) { System.out.println(System.currentTimeMillis() + "," + Thread.currentThread() + ",关闭订单:" + s); } }); } //3、系统关闭的时候,调用延迟队列的close方法 //orderCloseDelayQueue.close(); } }