《Java并发编程的艺术》Executor框架
上一章学了线程池,这个框架为我们提供了一些线程池。
Executor框架简介
二级调度模型
程序通过Executor框架控制上层的调度,下层的调度由操作系统内核控制。
所谓上层就是线程池中哪些任务在等待池,哪些任务被分配到运行线程上之类的。
所谓下层就是哪些线程被分配到哪些CPU上去执行。
Executor框架的结构与成员
1.结构
主要分为三个部分
- 任务
- 异步计算的结果
- 线程池
使用示意图
2.成员
(1)ThreadPoolExecutor
它通常由工厂类Executors创建。主要有三种线程池的实现
- ThreadPoolExecutor: 它主要适用于负载较重的服务器,限制线程的数量。
- SingleThreadExecutor: 它主要适用于保证任务有序运行的场景,并且在任何时间点不会有多个线程同时工作的情况。
- CachedThreadExecutor: 大小*的线程数,适用于执行很多的短期异步任务,或者负载较轻的任务。
(2)ScheduledThreadPoolExecutor
也是使用工厂类Executors创建,有两种类型。
- **ScheduledThreadPoolExecutor:**包含若干个线程,适用于需要多个后台线程执行周期任务,同时限制线程数量。
- **ScheduledThreadExecutor:**单个后台线程执行周期任务,同时保证有序。
(3)Future接口
当使用提交(submit)方法时,会返回一个Future对象。(submit在AbstractExecutorService 类中)
现在(jdk11)提交返回的仍然是FutureTask< T >对象。
(4)Runnable和Callable接口
区别在于Runnable不返回结果,Callable可以。
Runnable对象可以被包装成Callable。
ThreadPoolExecutor
只要了解了上一章的线程池,这一章的问题就不大。
1.FixedThreadPool
它的线程池大小和核心线程大小均被设置为了nThreads,而且等待时间为0(就是任务执行完就终止线程,啊无情),等待队列使用的是*队列,所以它不会拒绝任务而且不会新增超过设定的线程数(容量为Integer.MAX_VALUE)。
当当前线程数不超过核心线程数,且有新任务时就会创建新线程,当达到核心任务数时就将任务纳入等待队列中。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
2.SingleThreadExecutor
核心线程数和线程池大小都设定为1,等待时间同样为0,也为*阻塞队列。所以也不会拒绝任务(容量为Integer.MAX_VALUE)。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
3.CachedThreadPool
它会根据需求创建线程,线程池大小为Integer.MAX_VALUE,可以生存60秒,而且等待队列是没有容量的,所以来一个任务如果没有空闲线程则会新建一个。
如果提交一个新任务时,当前线程池中没有空闲线程,则会被阻塞等待线程池创建新线程后才成功提交给空闲线程。如果有空闲线程直接提交。而空闲线程只会等待60秒,如果来了任务就执行,没有就会被终止。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
ScheduledThreadPoolExecutor
继承自ThreadPoolExecutor,它主要用于在给定的延迟之后运行任务。
它的把待调度的任务(ScheduledFutureTask)放入等待队列(DelayedWorkQueue )中,这是它的一个内部类,是一个*队列,
DelayedWorkQueue 底层是用数组实现的一个小根堆,来实现对等待时间排序,时间早的任务先被执行。
DelayedWorkQueue的任务获取步骤:
- 获取Lock
- 获取周期任务:如果数组为空或者头元素未到期则在Condition中等待,获取头元素并在数组中移除,如果数组中仍有其他任务唤醒Condition中的一个线程。
- 释放Lock
DelayedWorkQueue的任务增添步骤: - 获取锁
- 如果等待的任务数大于等于了数组容量,则扩容到原来的1.5倍
- 添加任务:向数组中添加元素,并调整顺序,如果为头元素唤醒一个等待池中的线程
- 释放锁
执行步骤大概为:
- 从DelayedWorkQueue 中取出到期的任务(ScheduledFutureTask),执行
- 修改这个任务中的下次到期时间
- 将任务放回DelayQueue。
它为了能够实现周期性执行任务,对ThreadPoolExecutor做了如下修改
- 使用DelayedWorkQueue作为任务队列
- 获取任务的方式不同
- 执行周期任务后,增加额外的处理
FutureTask
Funture有个子接口RunnableFuture,这个接口同时继承Runnable接口,而FutureTask实现了这个接口,所以FutureTask不仅可以作为返回值,也可以作为任务提交给Executor执行。
FutureTask有三个状态,如下图所示:
FutureTask get(),cancel()的示意图
估计11的实现原理跟书中有差异,实现之后再补充吧,俺有点饿了。
上一篇: 数据结构——队列