04-java并发编程之定时线程池ScheduledThreadPoolExecutor
ScheduledThreadPoolExecutor介绍
- 之前介绍的ThreadPoolExecutor是java的普通线程池。而ScheduledThreadPoolExecutor是java提供的定时任务线程池。·
- ScheduledThreadPoolExecutor继承自ThreadPoolExecutor。它主要用来在给定的延迟之后运 行任务,或者定期执行任务。
- ScheduledThreadPoolExecutor的功能与Timer类似,但 它功能更强大、更灵活。Timer对应的是单个后台线程,而 ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数。
- ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令,适用于需要多个后台线程执行周期任务,同时为了满足资源 管理的需求而需要限制后台线程的数量的应用场景
ScheduledThreadPoolExecutor使用
// 常用
java.util.concurrent.ScheduledThreadPoolExecutor#schedule() 定时任务
java.util.concurrent.ScheduledThreadPoolExecutor#scheduleAtFixedRate() 固定速率连续执行
java.util.concurrent.ScheduledThreadPoolExecutor#scheduleWithFixedDelay()非固定速率连续执行
java.util.concurrent.ScheduledThreadPoolExecutor.DelayedWorkQueue延迟队列
ScheduledThreadPoolExecutor的运行机制
ScheduledThreadPoolExecutor的执行示意图如下图所示。
DelayQueue是一个*队列,所以ThreadPoolExecutor的maximumPoolSize在Scheduled- ThreadPoolExecutor中没有什么意义(设置maximumPoolSize的大小没有什么效果)。
ScheduledThreadPoolExecutor的执行主要分为两大部分。
- 1)当调用ScheduledThreadPoolExecutor的scheduleAtFixedRate()方法或者scheduleWith- FixedDelay()方法时,会向ScheduledThreadPoolExecutor的DelayQueue添加一个实现了 RunnableScheduledFutur接口的ScheduledFutureTask。
- 2)线程池中的线程从DelayQueue中获取ScheduledFutureTask,然后执行任务。
ScheduledThreadPoolExecutor的实现
ScheduledThreadPoolExecutor会把待调度的任务(ScheduledFutureTask) 放到一个DelayQueue中。 ScheduledFutureTask主要包含3个成员变量,如下。
- long型成员变量time,表示这个任务将要被执行的具体时间。
- long型成员变量sequenceNumber,表示这个任务被添加到ScheduledThreadPoolExecutor中 的序号。
- ·long型成员变量period,表示任务执行的间隔周期
DelayQueue封装了一个PriorityQueue,这个PriorityQueue会对队列中的Scheduled- FutureTask进行排序。排序时,time小的排在前面(时间早的任务将被先执行)。如果两个 ScheduledFutureTask的time相同,就比较sequenceNumber,sequenceNumber小的排在前面(也就 是说,如果两个任务的执行时间相同,那么先提交的任务将被先执行)。
首先,让我们看看ScheduledThreadPoolExecutor中的线程执行周期任务的过程。
运行过程
下图是 ScheduledThreadPoolExecutor中的线程1执行某个周期任务的4个步骤
1)线程1从DelayQueue中获取已到期的ScheduledFutureTask(DelayQueue.take())。到期任务 是指ScheduledFutureTask的time大于等于当前时间。
2)线程1执行这个ScheduledFutureTask。
3)线程1修改ScheduledFutureTask的time变量为下次将要被执行的时间。
4)线程1把这个修改time之后的ScheduledFutureTask放回DelayQueue中(DelayQueue.add())。
下面是DelayQueue.take()方法的源代码实现。
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
RunnableScheduledFuture first = queue[0];
if (first == null) //第一个有没有 没有等着
available.await();
else {
long delay = first.getDelay(TimeUnit.NANOSECONDS);//到时间了
if (delay <= 0)//到时间了
return finishPoll(first);
else if (leader != null)
available.await();//因为没有执行线程初始化,所以等等什么时候有了自己被他人唤醒
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);//各种condition的awaitNanos 带时间的
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && queue[0] != null)
available.signal();
lock.unlock();
}
}