欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

ScheduledThreadPoolExecutor详解

程序员文章站 2022-07-10 19:12:27
一、ScheduledThreadPoolExecutor简介:主要用于执行周期性任务;所以在看本文之前最好先了解一下 ThreadPoolExecutor ,可以参考 ThreadPoolExecutor 详解;另外 ScheduledThreadPoolExecutor 中使用了延迟队列二、ScheduledThreadPoolExecutor 结构概述继承关系public class ScheduledThreadPoolExecutor extends ThreadPoolExecut...

一、ScheduledThreadPoolExecutor简介:

主要用于执行周期性任务;所以在看本文之前最好先了解一下 ThreadPoolExecutor ,可以参考 ThreadPoolExecutor 详解;另外 ScheduledThreadPoolExecutor 中使用了延迟队列

二、ScheduledThreadPoolExecutor 结构概述

  1. 继承关系
public class ScheduledThreadPoolExecutor 
extends ThreadPoolExecutor implements 
ScheduledExecutorService {}
  1. 关系图
    ScheduledThreadPoolExecutor详解
    在源码中可以看到,ScheduledThreadPoolExecutor 的状态管理、入队操作、拒绝操作等都是继承于 ThreadPoolExecutor;ScheduledThreadPoolExecutor 主要是提供了周期任务和延迟任务相关的操作;
  1. schedule(Runnable command, long delay, TimeUnit unit) // 无返回值的延迟任务
  2. schedule(Callable callable, long delay, TimeUnit unit) // 有返回值的延迟任务
  3. scheduleAtFixedRate(Runnable command, long initialDelay, long period,
  4. TimeUnit unit) // 固定频率周期任务
  5. scheduleWithFixedDelay(Runnable command,
    long initialDelay, long delay, TimeUnit unit) // 固定延迟周期任务

ScheduledThreadPoolExecutor的运行逻辑而言,大致可以表述为:

  1. 首先将 Runnable/Callable 封装为 ScheduledFutureTask,延迟时间作为比较属性;
  2. 然后加入DelayedWorkQueue 队列中,每次取出队首延迟最小的任务,超时等待,然后执行;
  3. 最后判断是否为周期任务,然后重新加入DelayedWorkQueue 队列中;

其内部结构如图所示:
ScheduledThreadPoolExecutor详解
这里需要注意的:

  1. ScheduledThreadPoolExecutor 中的队列不能指定,只能是 DelayedWorkQueue;因为他是 *队列,所以再添加任务的时候线程最多可以增加到 coreSize,这里不清楚的可以查看 ThreadPoolExecutor 详解,就不再重复了;
  2. ScheduledThreadPoolExecutor 重写了 ThreadPoolExecutor 的execute() 方法,其执行的核心方法变成 delayedExecute();
  1. ScheduledFutureTask
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
  private final long sequenceNumber;  // 任务序号,从 AtomicLong sequencer 获取,当延迟时间相同时,序号小的先出
  private long time;                  // 下次任务执行时间
  private final long period;          // 0 表示非周期任务,正值表示固定频率周期任务,负值表示固定延迟周期任务
  RunnableScheduledFuture<V> outerTask = this;  // 重复执行的任务,传入的任务可以使用 decorateTask() 重新包装
  int heapIndex;                      // 队列索引
}

ScheduledThreadPoolExecutor详解
其中最重要的方法必然是 run 方法了:

public void run() {
  boolean periodic = isPeriodic();    // 是否为周期任务,period != 0
  if (!canRunInCurrentRunState(periodic))  // 当前状态能否继续运行,详细测试后面还会讲到
    cancel(false);     // 取消任务
  else if (!periodic)  // 不是周期任务时,直接运行
    ScheduledFutureTask.super.run();
  else if (ScheduledFutureTask.super.runAndReset()) {  // 时周期任务
    setNextRunTime();              // 设置下次执行时间
    reExecutePeriodic(outerTask);  // 重新入队
  }
}

public boolean cancel(boolean mayInterruptIfRunning) {
  boolean cancelled = super.cancel(mayInterruptIfRunning);  // 设置中断状态
  if (cancelled && removeOnCancel && heapIndex >= 0)        // 当设置 removeOnCancel 状态时,移除任务
    remove(this);                                           // 默认为 false
  return cancelled;
}
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
  if (canRunInCurrentRunState(true)) {  // 如果当前状态可以执行
    super.getQueue().add(task);         // 则重新入队
    if (!canRunInCurrentRunState(true) && remove(task))
      task.cancel(false);
    else ensurePrestart();              // 确保有线程执行任务 
  }
}

三、ScheduledExecutorService接口介绍

ScheduledThreadPoolExecutor直接继承自ScheduledExecutorServiceScheduledThreadPoolExecutor 类的功能也主要体现在ScheduledExecutorService 接口上,它的最主要的功能就是可以对其中的任务进行调度,比如延迟执行、定时执行等等。
ScheduledExecutorService接口定义:

public interface ScheduledExecutorService extends ExecutorService {   
    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay, TimeUnit unit);
    public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay, TimeUnit unit);
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit);
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);}

从上面接口定义我们知道,提供了四个方法,下面我们就分别介绍:

 1. schedule (Runnable task, long delay, TimeUnit timeunit)
 2. schedule (Callable task, long delay, TimeUnit timeunit)
 3. scheduleAtFixedRate (Runnable, long initialDelay, long period, TimeUnit timeunit)
 4. scheduleWithFixedDelay (Runnable, long initialDelay, long period, TimeUnit timeunit)
  1. schedule (Runnable task, long delay, TimeUnit timeunit)
    这个方法的意思是在指定延迟之后运行task。这个方法有个问题,就是没有办法获知task的执行结果。如果我们想获得task的执行结果,我们可以传入一个Callable的实例
ScheduledExecutorService scheduledExecutorService =
    Executors.newScheduledThreadPool(5);

ScheduledFuture scheduledFuture =
scheduledExecutorService.schedule(new Callable() {
    public Object call() throws Exception {
        System.out.println("Executed!");
        return "Called!";
    }
},
5,
TimeUnit.SECONDS);
System.out.println("result = " + scheduledFuture.get());
scheduledExecutorService.shutdown();
  1. schedule (Callable task, long delay, TimeUnit timeunit)
    这个方法与schedule (Runnable task)类似,也是在指定延迟之后运行task,不过它接收的是一个Callable实例,此方法会返回一个ScheduleFuture对象,通过ScheduleFuture我们可以取消一个未执行的task,也可以获得这个task的执行结果。
ScheduledExecutorService scheduledExecutorService =
    Executors.newScheduledThreadPool(5);
ScheduledFuture scheduledFuture =
scheduledExecutorService.schedule(new Callable() {
    public Object call() throws Exception {
        System.out.println("Executed!");
        return "Called!";
    }
},
5,
TimeUnit.SECONDS);
System.out.println("result = " + scheduledFuture.get());
scheduledExecutorService.shutdown();
  1. scheduleAtFixedRate (Runnable, long initialDelay, long period,
    TimeUnit timeunit)

这个方法的作用是周期性的调度task执行。task第一次执行的延迟根据initialDelay参数确定,以后每一次执行都间隔period时长。

如果task的执行时间大于定义的period,那么下一个线程将在当前线程完成之后再执行。整个调度保证不会出现一个以上任务同时执行。

  1. scheduleWithFixedDelay (Runnable, long initialDelay, long period, TimeUnit timeunit)

scheduleWithFixedDelay的参数和scheduleAtFixedRate参数完全一致,它们的不同之处在于对period调度周期的解释。在scheduleAtFixedRate中,period指的两个任务开始执行的时间间隔,也就是当前任务的开始执行时间和下个任务的开始执行时间之间的间隔。而在scheduleWithFixedDelay中,period指的当前任务的结束执行时间到下个任务的开始执行时间。

  1. ScheduledExecutorService的关闭

和ExecutorService类似, 我们在使用完ScheduledExecutorService时需要关闭它。如果不关闭的话,JVM会一直运行直,即使所有线程已经关闭了。关闭ScheduledExecutorService可以使用其继承自ExecutorService接口的shutdown()和shutdownNow()方法,两者的区别请参考【Java线程池 ExecutorService】。

文章参考:https://www.cnblogs.com/sanzao/p/10760641.html
文章参考:https://cloud.tencent.com/developer/article/1157685

本文地址:https://blog.csdn.net/cristianoxm/article/details/107640772

相关标签: JUC