使用DelayQueue构建自己的ThreadPoolExecutor
前言
在日常java开发过程中使用线程池一般都是通过Executors提供的静态方法创建线程池,但目前还没有提供使用DelayQueue(延迟队列)作为任务队列的线程池创建方法。在笔者另一篇博客中《DelayQueue--阅读源码从jdk开始》,有个场景需要使用DelayQueue实现定时的页面发布功能,在那次实现过程中使用DelayQueue的take方法获取到任务后再放入线程池,由于这里是串行take,如果在同一时刻有多个任务需要被执行,这时势必有有延迟,虽然延迟不多,但不是最佳实现方案。
通过前一篇对ThreadPoolExecutor总结(点这里),我们可以直接使用ThreadPoolExecutor的构造方法构造自定义的线程池,使用DelayQueue作为“任务队列”即可。
使用DelayQueue创建线程池
这个步骤很简单,只要理解了ThreadPoolExecutor构造方法的各个参数即可(对各个参数的详细讲解见上一篇文章):
DelayQueue queue = new DelayQueue<>();//延迟队列 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3,10,1000l, TimeUnit.MILLISECONDS,queue);
之后,只需调用ThreadPoolExecutor的execute提交任务即可。
创建延迟任务类
我们知道ThreadPoolExecutor的execute方法,需要一个实现了Runnable接口的对象,那么这个任务类必须是实现Runnable接口;并且最终这个对象要能放到DelayQueue中,这个任务类必须实现Delayed接口。最终这个任务类实现如下:
public class TaskInfo implements Delayed,Runnable { //任务id private int id; //业务类型 private int type; //业务数据 private String data; //执行时间 private long excuteTime; public TaskInfo(int id, int type, String data, long excuteTime) { this.id = id; this.type = type; this.data = data; this.excuteTime = TimeUnit.NANOSECONDS.convert(excuteTime, TimeUnit.MILLISECONDS)+System.nanoTime(); } public int getId() { return id; } public void setId(int id) { this.id = id; } public int getType() { return type; } public void setType(int type) { this.type = type; } public String getData() { return data; } public void setData(String data) { this.data = data; } public long getExcuteTime() { return excuteTime; } public void setExcuteTime(long excuteTime) { this.excuteTime = excuteTime; } @Override public long getDelay(TimeUnit unit) { return unit.convert(this.excuteTime- System.nanoTime() , TimeUnit.NANOSECONDS); } @Override public int compareTo(Delayed o) { TaskInfo msg = (TaskInfo)o; return this.excuteTime>msg.excuteTime?1:( this.excuteTime<msg.excuteTime?-1:0); } @Override public void run() { System.out.println("run task:"+id); } }
初始化核心线程
上面已经创建好任务类了,也许大家会觉得直接new TaskInfo(),并且调用ThreadPoolExecutor的execute方法提交任务就行,如下:
//创建任务 TaskInfo t1 = new TaskInfo(1,1,"任务1",8000); TaskInfo t2 = new TaskInfo(2,2,"任务2",8000); //提交任务 threadPoolExecutor.execute(t1); threadPoolExecutor.execute(t2);
通过前一篇文章的分析,在线程池刚初始化时,由于核心线程数为0,此时执行execute提交任务,任务不会进入延迟队列,而是直接执行,就无法满足业务需求(任务被提前执行了)。正确做法是在线程初始化完成后,先调用prestartAllCoreThreads方法,先创建好核心线程,即:
threadPoolExecutor.prestartAllCoreThreads();
完成示例代码:
public class ThreadPoolExecutorTest { private static ExecutorService es = Executors.newFixedThreadPool(3);//3个线程的线程池 public static void main(String[] args){ DelayQueue queue = new DelayQueue<>();//延迟队列 ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(3,10,1000l, TimeUnit.MILLISECONDS,queue); threadPoolExecutor.prestartAllCoreThreads();//初始化核心线程 TaskInfo t1 = new TaskInfo(1,1,"任务1",8000); TaskInfo t2 = new TaskInfo(2,2,"任务2",8000); TaskInfo t3 = new TaskInfo(3,3,"任务3",9000); TaskInfo t4 = new TaskInfo(4,4,"任务4",5000); TaskInfo t5 = new TaskInfo(5,5,"任务5",5000); TaskInfo t6 = new TaskInfo(6,6,"任务6",6000); TaskInfo t7 = new TaskInfo(7,7,"任务7",7000); TaskInfo t8 = new TaskInfo(8,8,"任务8",10000); threadPoolExecutor.execute(t1); threadPoolExecutor.execute(t2); threadPoolExecutor.execute(t3); threadPoolExecutor.execute(t4); threadPoolExecutor.execute(t5); threadPoolExecutor.execute(t6); threadPoolExecutor.execute(t7); threadPoolExecutor.execute(t8); } }
执行main方法,可以发现任务是按时延迟执行的,而且如果在同一刻如果有多个任务需要执行,这时也可以利用线程池并行执行,进一步降低延迟。
另外大家也可以注释掉threadPoolExecutor.prestartAllCoreThreads();这句,验证下如果不初始化核心线程会有什么后果。
心灵鸡汤
有的程序员觉得整天实现一些简单的功能没有技术含量,如果你觉得某项工作没有技术含量,那只是你自己把它做得没有技术含量,认真的写好自己的每一行代码,不停的去完善,它就会成为有技术含量的工作。想想达芬奇画鸡蛋的故事。
摘自--《天星老师语录》