欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

ScheduledThreadPoolExecutor源码解读(二)——ScheduledFutureTask时间调度执行任务(延迟执行、周期性执行)

程序员文章站 2022-06-09 13:03:18
...

一、前言

延迟阻塞队列DelayedWorkQueue中放的元素是ScheduledFutureTask,提交的任务被包装成ScheduledFutureTask放进工作队列,Woker工作线程消费工作队列中的任务,即调用ScheduledFutureTask.run()ScheduledFutureTask又调用任务的run(),这点和ThreadPoolExecutor差不多,而ScheduledThreadPoolExecutor是如何实现按时间调度的呢?

ScheduledThreadPoolExecutor提交任务的核心函数有3个:

  1. schedule(...) 按一定延迟时长执行任务,只执行一次。
  2. scheduleAtFixedRate(...)按固定频率,周期性执行任务。
  3. scheduleWithFixedDelay(...)按固定延迟时间,受任务执行时长影响,周期性执行任务。

二、提交任务

首先从3个核心函数出发,其入口源码相似,提交的任务都会先创建一个ScheduledFutureTask对象,然后调用decorateTask包装返回RunnableScheduledFuture对象,最后刚才被包装成RunnableScheduledFuture对象作为参数调用统一的延迟执行函数delayedExecute()

public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        //将任务包装成RunnableScheduledFuture对象
        //decorateTask直接返回sft,这个函数的意图是让开发者DIY继承实现的
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));
        //延迟执行 加延迟阻塞队列+启动一个空的Worker线程
        delayedExecute(t);
        return t;
}

public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();

        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          //传入第一次延时时间 now+initialDelay
                                          triggerTime(initialDelay, unit),
                                          unit.toNanos(period));
        //将任务包装成RunnableScheduledFuture对象,
        //decorateTask直接返回sft,这个函数的意图是让开发者DIY继承实现的
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        //延迟执行 加延迟阻塞队列+启动一个空的Worker线程
        delayedExecute(t);
        return t;
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        ScheduledFutureTask<Void> sft =
            new ScheduledFutureTask<Void>(command,
                                          null,
                                          triggerTime(initialDelay, unit),
                                          //重点! 传入的delay取反了,用delay正负来区分执行间隔是否固定
                                          unit.toNanos(-delay));
        //将任务包装成RunnableScheduledFuture对象
        //decorateTask直接返回sft,这个函数的意图是让开发者DIY继承实现的
        RunnableScheduledFuture<Void> t = decorateTask(command, sft);
        sft.outerTask = t;
        //延迟执行 加延迟阻塞队列+启动一个空的Worker线程
        delayedExecute(t);
        return t;
}

1、decorateTask留给开发者去实现

decorateTask源码,其有两个参数,任务原始对象runnable和把原始任务包装成RunnableScheduledFuture对象。decorateTask函数直接返回RunnableScheduledFuture对象,没有做什么事情,那其意图是什么呢?

decorateTask是想让开发者继承ScheduledThreadPoolExecutor实现定制化定时线程池时,可以实现这个函数,对原始任务对象和包装后任务对象做特殊DIY处理。

protected <V> RunnableScheduledFuture<V> decorateTask(
    Runnable runnable, RunnableScheduledFuture<V> task) {
    return task;
}

2、delayedExecute延迟调度

delayedExecute()是延迟执行和周期性执行的主函数,其基本流程如下:

  1. 判断线程池的状态,runstateshutdown将拒绝任务提交。
  2. 任务处于正常运行状态,则将任务直接加入阻塞工作队列。
  3. 再次判断线程池的状态,runstateshutdown,再判断是否是周期性任务(isPeriodic),不同的性质不同的处理策略。
  4. 一起正常预启动一个空Worker线程,循环从阻塞队列中消费任务。
private void delayedExecute(RunnableScheduledFuture<?> task) {
    if (isShutdown())
        reject(task);
    else {
        //1、直接加入延时阻塞队列
        super.getQueue().add(task);
        if (isShutdown() &&
            !canRunInCurrentRunState(task.isPeriodic()) &&
            remove(task))
            task.cancel(false);
        else
            //2、预启动一个空的worker
            ensurePrestart();
    }
}
void ensurePrestart() {
        int wc = workerCountOf(ctl.get());
        if (wc < corePoolSize)
            //创建一个空worker,并且启动
            addWorker(null, true);
        else if (wc == 0)
            addWorker(null, false);
}

三、ScheduledFutureTask时间调度执行的核心

可以看出提交的任务最重被包装成ScheduledFutureTask,然后加到工作队列由Worker工作线程去消费了。

延迟执行和周期性执行的核心代码也就在于ScheduledFutureTaskScheduledThreadPoolExecutor源码解读(二)——ScheduledFutureTask时间调度执行任务(延迟执行、周期性执行)

1、基本架构

ScheduledFutureTask继承了FutureTask并实现了接口RunnableScheduledFuture

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-dxeJoszR-1604798379406)(C:\study\myStudy\myNotes\java\concurrent\线程池\ScheduledFutureTask.png)]

private class ScheduledFutureTask<V>
        extends FutureTask<V> implements RunnableScheduledFuture<V> {

    /** Sequence number to break ties FIFO */
    private final long sequenceNumber;

    /** The time the task is enabled to execute in nanoTime units */
    任务被调用的执行时间
    private long time;
    /**
     * Period in nanoseconds for repeating tasks.
     */
     //周期性执行的时间间隔
    private final long period;

    /** The actual task to be re-enqueued by reExecutePeriodic */
    RunnableScheduledFuture<V> outerTask = this;

    /**
     * Index into delay queue, to support faster cancellation.
     * 索引到延迟队列为了支持快速取消
     */
    int heapIndex;

    /**
     * Creates a one-shot action with given nanoTime-based trigger time.
     */
    ScheduledFutureTask(Runnable r, V result, long ns) {
        super(r, result);
        this.time = ns;
        this.period = 0;
        this.sequenceNumber = sequencer.getAndIncrement();
    }
}

ScheduledFutureTask实现了接口Delayed,所以需要重写两个方法getDelaycompareTo

//获取当前延迟时间(距离下次任务执行还有多久)
public long getDelay(TimeUnit unit) {
    return unit.convert(time - now(), NANOSECONDS);
}
/**
 * 比较this 和 other谁先执行
 * @param other
 * @return <=0 this先执行
 */
public int compareTo(Delayed other) {
     if (other == this) // compare zero if same object
         return 0;
     if (other instanceof ScheduledFutureTask) {
         ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
         long diff = time - x.time;
         if (diff < 0)
             return -1;
         else if (diff > 0)
             return 1;
         else if (sequenceNumber < x.sequenceNumber)
             return -1;
         else
             return 1;
       }
       //比较Delay
      long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
      return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}

2、延迟执行和周期执行区别

延迟执行和周期执行区别在于period

  • 延迟执行period=0
  • 周期性执行period!=0
  • 固定频率(AtFixedRate)周期性执行period>0,每次开始执行的时间的间隔是固定的,不受任务执行时长影响。
  • 固定延迟时间(WithFixedDelay)周期性执行period<0,每次执行的时间受任务执行时长影响,是任务执行结束后的当前时间+ (-p)。
public boolean isPeriodic() {
    return period != 0;
}
private void setNextRunTime() {
    long p = period;
    //AtFixedRate 当传入period > 0 时 ,每次执行的时间的间隔是固定的
    if (p > 0)
        time += p;
    else
    //WithFixedDelay  当传入period < 0 时,每次执行的时间受任务执行时长影响,是任务执行结束后的当前时间+ (-p)
        time = triggerTime(-p);
}
long triggerTime(long delay) {
    return now() + ((delay < (Long.MAX_VALUE >> 1)) ? delay : overflowFree(delay));
}

3、heapIndex支持快速取消定时任务

ScheduledFutureTask还有一个变量heapIndex,是记录任务在阻塞队列的索引的,其方便支持快速取消任务和删除任务。但是其并不会作为删除任务的位置判断,只是当用于判断惹怒是否在阻塞队列中:heapIndex >= 0 在阻塞队列中,取消任务时需要同时从阻塞队列删除任务;heapIndex < 0不在阻塞队列中。

阻塞队列DelayedWorkQueue的每次堆化siftUp()siftDown(),以及remove()都维护着heapIndex,想必这也是ScheduledThreadPoolExecutor自行定制延迟阻塞队列的原因之一。

public boolean cancel(boolean mayInterruptIfRunning) {
    boolean cancelled = super.cancel(mayInterruptIfRunning);
    if (cancelled && removeOnCancel && heapIndex >= 0)
        //从延迟阻塞队列中删除任务
        remove(this);
    return cancelled;
}

4、核心逻辑run()

ScheduledFutureTask间接实现了接口Runnable,其核心逻辑就在run()

  • 周期性任务,continueExistingPeriodicTasksAfterShutdown默认为false,意为调用shutdown()时,会取消和阻止周期性任务的执行。
  • 非周期性任务,executeExistingDelayedTasksAfterShutdown默认为true,意为调用shutdown()时,不会取消和阻止非周期性任务的执行。
public void run() {
    boolean periodic = isPeriodic();
    //当runState为SHUTDOWN时,非周期性任务继续,周期性任务会中断取消

    if (!canRunInCurrentRunState(periodic))
        cancel(false);
    else if (!periodic)
        //非周期性任务,只执行一次
        ScheduledFutureTask.super.run();
    //runAndReset返回false周期性任务将不再执行
    else if (ScheduledFutureTask.super.runAndReset()) {
        //runAndReset()  周期性任务执行并reset
        //设置下一次执行时间
        setNextRunTime();
        //把自己再放回延时阻塞队列
        reExecutePeriodic(outerTask);
    }
}

(1)canRunInCurrentRunState不同任务性质不同策略

代码一开始判断线程池的运行状态canRunInCurrentRunState,当线程池处于SHUTDOWN状态时,是否是周期性任务有不同的策略:

  • 周期性任务,continueExistingPeriodicTasksAfterShutdown默认为false,意为线程池被关闭时,应该取消和阻止周期性任务的执行。
  • 非周期性任务,executeExistingDelayedTasksAfterShutdown默认为true,意为线程池被关闭时,不会取消和阻止非周期性任务的执行。
boolean canRunInCurrentRunState(boolean periodic) {
    return isRunningOrShutdown(periodic ?
                               continueExistingPeriodicTasksAfterShutdown :
                               executeExistingDelayedTasksAfterShutdown);
}
/**
 * False if should cancel/suppress periodic tasks on shutdown.
 */
private volatile boolean continueExistingPeriodicTasksAfterShutdown;

/**
 * False if should cancel non-periodic tasks on shutdown.
 */
private volatile boolean executeExistingDelayedTasksAfterShutdown = true;

(2)单次执行调度父类FutureTask.run

延迟执行任务,即只执行一次,调用了父类的FutureTask.run()。提交的任务如果是Runnable型,会被包装成Callable型作为FutureTask的成员变量。FutureTask.run()中直接调度执行任务的代码call(),同时返回结果。

需要注意的是,任务代码c.call()若抛出异常会被FutureTask捕获处理,这样对外查找问题不利,所以最好在任务run()或者call()的核心代码用try-catch包起来。

public void run() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                //调用callable的call,并设置返回值
                //如果传进来的任务是Runnable,会被转换成callable
                result = c.call();
                //若运行异常,ran=false,异常会被捕获处理
                //所以传进来的任务的run或者call代码块最好try-catch下
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

(3)周期性执行调度父类FutureTask.runAndReset

从源码看出,周期性执行任务没有返回值,FutureTask.runAndReset最终返回布尔值,并且也会捕获任务代码异常,最终返回true代表代码没有出现异常,下次可以正常执行,false代表任务代码中有异常,下次不能正常执行。

所以特别强调任务代码必须要try-catch,否则一旦出现异常,周期性执行将不会再设置下次执行时间和把自己放回延迟阻塞队列。

protected boolean runAndReset() {
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return false;
    boolean ran = false;
    int s = state;
    try {
        Callable<V> c = callable;
        if (c != null && s == NEW) {
            try {
                c.call(); // don't set result
                //如果c.call抛异常,将会被处理,但是没有打印堆栈,使用者不易排查
                // 不会再往下执行ran=false
                //所以传进来的任务run里需要自己try-catch
                ran = true;
            } catch (Throwable ex) {
                setException(ex);
            }
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
    return ran && s == NEW;
}

四、onShutdown

还记得ThreadPoolExecutor.shutdown()代码里一个空的钩子函数onShutdown()吗?

ScheduledThreadPoolExecutorshutdown()shutdownNow()都是调用了父类ThreadPoolExecutor的函数,但是实现了onShutdown()

回顾当调用shutdownNow()时,会清空工作队列并中断所有工作线程,这样正在执行的任务也会中断,可额外做的事情几乎没有,所以官方也没有给开发者留钩子函数。

而调用shutdown(),不会清空工作线程也不会中断正在执行任务的工作线程,对于ScheduledThreadPoolExecutor有周期性任务,会往复重置执行,如果不额外做些处理就使得即使调用了shutdown()也不会销毁线程池。

onShutdown()对于任务性质不同有不同处理策略:

  • 周期性任务,continueExistingPeriodicTasksAfterShutdown默认为false,意为调用shutdown()时,会取消和阻止周期性任务的执行。
  • 非周期性任务,executeExistingDelayedTasksAfterShutdown默认为true,意为调用shutdown()时,不会取消和阻止非周期性任务的执行。
/**
 * Cancels and clears the queue of all tasks that should not be run
 * due to shutdown policy.  Invoked within super.shutdown.
 */
@Override void onShutdown() {
    BlockingQueue<Runnable> q = super.getQueue();
    //Shutdown后是否保持延时, 默认true
    boolean keepDelayed =
        getExecuteExistingDelayedTasksAfterShutdownPolicy();
    //Shutdown后是否保持周期, 默认false
    boolean keepPeriodic =
        getContinueExistingPeriodicTasksAfterShutdownPolicy();
    if (!keepDelayed && !keepPeriodic) {
        for (Object e : q.toArray())
            if (e instanceof RunnableScheduledFuture<?>)
                ((RunnableScheduledFuture<?>) e).cancel(false);
        q.clear();
        //取消并清空队列
    }
    else {
        // Traverse snapshot to avoid iterator exceptions
        for (Object e : q.toArray()) {
            if (e instanceof RunnableScheduledFuture) {
                RunnableScheduledFuture<?> t =
                    (RunnableScheduledFuture<?>)e;
                if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
                    t.isCancelled()) { // also remove if already cancelled
                    if (q.remove(t))
                        t.cancel(false);
                }
            }
        }
    }
    //尝试中断线程池
    tryTerminate();
}

五、总结

通过阅读源码,知道了一些需要注意的细节,一不小心就会踩坑:

  1. 任务代码一定要try-catch,否则异常被ScheduledFutureTask的父类FutureTask捕获处理,难以排查问题,同时周期性执行任务会因为任务代码抛异常而不再设置下次执行时间和把自己放回延迟队列的操作,即不会再周期性执行。
  2. ScheduledFutureTask通过一个变量就区分了延迟和周期性执行,period=0延迟执行,即只执行一次;period>0固定频率周期执行;``period<0`固定延迟时间周期执行,两次任务开始执行时间间隔受任务执行耗时影响。
  3. 如果周期性任务的执行时长大于period,且看重执行等间隔,使用scheduleWithFixedDelay()
  4. 若周期性任务的执行时长远小于period,则可以使用scheduleAtFixedRate()
  5. 默认情况下,线程池处于关闭状态(shutdown),周期性任务会被取消和阻止执行,非周期性任务会顺利执行完成不会被阻止。

PS: 如若文章中有错误理解,欢迎批评指正,同时非常期待你的评论、点赞和收藏。我是徐同学,愿与你共同进步!