欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

《Java并发编程的艺术》Executor框架

程序员文章站 2022-05-05 09:42:12
...

上一章学了线程池,这个框架为我们提供了一些线程池。

Executor框架简介

二级调度模型

程序通过Executor框架控制上层的调度,下层的调度由操作系统内核控制。
所谓上层就是线程池中哪些任务在等待池,哪些任务被分配到运行线程上之类的。
所谓下层就是哪些线程被分配到哪些CPU上去执行。
《Java并发编程的艺术》Executor框架

Executor框架的结构与成员

1.结构
主要分为三个部分

  • 任务
  • 异步计算的结果
  • 线程池

《Java并发编程的艺术》Executor框架
使用示意图
《Java并发编程的艺术》Executor框架
2.成员
(1)ThreadPoolExecutor
它通常由工厂类Executors创建。主要有三种线程池的实现

  • ThreadPoolExecutor: 它主要适用于负载较重的服务器,限制线程的数量。
  • SingleThreadExecutor: 它主要适用于保证任务有序运行的场景,并且在任何时间点不会有多个线程同时工作的情况。
  • CachedThreadExecutor: 大小*的线程数,适用于执行很多的短期异步任务,或者负载较轻的任务。

(2)ScheduledThreadPoolExecutor
也是使用工厂类Executors创建,有两种类型。

  • **ScheduledThreadPoolExecutor:**包含若干个线程,适用于需要多个后台线程执行周期任务,同时限制线程数量。
  • **ScheduledThreadExecutor:**单个后台线程执行周期任务,同时保证有序。

(3)Future接口
当使用提交(submit)方法时,会返回一个Future对象。(submit在AbstractExecutorService 类中)
现在(jdk11)提交返回的仍然是FutureTask< T >对象。

(4)Runnable和Callable接口
区别在于Runnable不返回结果,Callable可以。
Runnable对象可以被包装成Callable。

ThreadPoolExecutor

只要了解了上一章的线程池,这一章的问题就不大。

1.FixedThreadPool
它的线程池大小和核心线程大小均被设置为了nThreads,而且等待时间为0(就是任务执行完就终止线程,啊无情),等待队列使用的是*队列,所以它不会拒绝任务而且不会新增超过设定的线程数(容量为Integer.MAX_VALUE)。
当当前线程数不超过核心线程数,且有新任务时就会创建新线程,当达到核心任务数时就将任务纳入等待队列中。

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

2.SingleThreadExecutor
核心线程数和线程池大小都设定为1,等待时间同样为0,也为*阻塞队列。所以也不会拒绝任务(容量为Integer.MAX_VALUE)。

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

3.CachedThreadPool
它会根据需求创建线程,线程池大小为Integer.MAX_VALUE,可以生存60秒,而且等待队列是没有容量的,所以来一个任务如果没有空闲线程则会新建一个。
如果提交一个新任务时,当前线程池中没有空闲线程,则会被阻塞等待线程池创建新线程后才成功提交给空闲线程。如果有空闲线程直接提交。而空闲线程只会等待60秒,如果来了任务就执行,没有就会被终止。

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

ScheduledThreadPoolExecutor

继承自ThreadPoolExecutor,它主要用于在给定的延迟之后运行任务。
它的把待调度的任务(ScheduledFutureTask)放入等待队列(DelayedWorkQueue )中,这是它的一个内部类,是一个*队列,
DelayedWorkQueue 底层是用数组实现的一个小根堆,来实现对等待时间排序,时间早的任务先被执行。
DelayedWorkQueue的任务获取步骤:

  • 获取Lock
  • 获取周期任务:如果数组为空或者头元素未到期则在Condition中等待,获取头元素并在数组中移除,如果数组中仍有其他任务唤醒Condition中的一个线程。
  • 释放Lock
    DelayedWorkQueue的任务增添步骤:
  • 获取锁
  • 如果等待的任务数大于等于了数组容量,则扩容到原来的1.5倍
  • 添加任务:向数组中添加元素,并调整顺序,如果为头元素唤醒一个等待池中的线程
  • 释放锁

执行步骤大概为:

  • 从DelayedWorkQueue 中取出到期的任务(ScheduledFutureTask),执行
  • 修改这个任务中的下次到期时间
  • 将任务放回DelayQueue。

它为了能够实现周期性执行任务,对ThreadPoolExecutor做了如下修改

  • 使用DelayedWorkQueue作为任务队列
  • 获取任务的方式不同
  • 执行周期任务后,增加额外的处理

FutureTask

Funture有个子接口RunnableFuture,这个接口同时继承Runnable接口,而FutureTask实现了这个接口,所以FutureTask不仅可以作为返回值,也可以作为任务提交给Executor执行。
FutureTask有三个状态,如下图所示:
《Java并发编程的艺术》Executor框架
FutureTask get(),cancel()的示意图
《Java并发编程的艺术》Executor框架
估计11的实现原理跟书中有差异,实现之后再补充吧,俺有点饿了。