欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

并发编程的艺术之读书笔记(十六)

程序员文章站 2022-05-04 10:52:36
...

前言:

上一部分我们学习了java的线程池,这一部分,我们来学习java的Executor框架

1. Executor框架简介

java多线程程序通常把应用分解成若干个任务,然后使用用户级的调度器(Executor框架)将这些任务映射为固定数量的线程。Executor框架的结构如下

  • 任务。包括任务需要实现的接口:Runnable接口或Callable接口
  • 任务的执行,包括任务执行机制的核心接口Executor,以及继承自Executor的ExecutorService接口(ThreadPoolExecutor和ScheduledThreadPoolExecutor)
  • 异步计算的结果。包括接口Future和实现Future接口的FutureTask类

我们来看下Executor框架的类图

并发编程的艺术之读书笔记(十六)

接着我们再看下Executor框架的使用示意图

并发编程的艺术之读书笔记(十六)

2. Executor框架的成员

1. ThreadPoolExecutor

ThreadPoolExecutor通常使用工厂类Executors来创建。Executors可以创建3种类型的ThreadPoolExecutor:SingleThreadExecutor,FixedThreadPool,和CacheThreadPool。先介绍FixedThreadPool

FixedThreadPool适用于为了满足资源管理的要求,而需要限制当前线程数量的应用场景,它适用于负载比较重的服务器。

2. SingleThreadExecutor

适用于需要保证顺序地执行各个任务,并且再任意时间点,不会有多个线程是活动的应用场景。

3. CachedThreadPool

是一个大小*的线程池,适用于执行很多的短期异步任务的小程序

4. SingleThreadScheduledExecutor

适用于需要单个后台线程执行周期任务,同时需要保证顺序执行各个任务的应用场景

5. ScheduledThreadPoolExecutor

适用于需要多个后台线程执行周期任务,同时为了满足资源管理需要限制后台线程数量的应用场景

6. Future接口

Future接口和实现类FutureTask用来表示异步计算的结果。

7. Runnable和Callable接口

都可以被ThreadPoolExecutor或者ScheduledThreadPoolExecutor执行,区别是Runnable接口不会返回结果,Callable接口会返回结果

3. ThreadPoolExecutor详解

上一部分我摸嗯已经了解了ThreadPoolExecutor在构造的时候需要传入的参数,这里就不再讲解,直接开始讲解FixedThreadPool

FixedThreadPool称为可重用固定线程数的线程池

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

FixedThreadPool的execute()方法运行示意图如下所示

并发编程的艺术之读书笔记(十六)

说明如下

  1. 如果当前运行线程数少于corePoolSize,则创建新线程来执行任务
  2. 在线程池完成预热之后,将任务加入LinkedBlockingQueue
  3. 执行完1中的任务后,会在循环中反复从LinkedBlockingQueue获取任务来执行

FixedThreadPool使用*队列LinkedBlockingQueue作为线程池的工作队列,使用*队列会对线程池带来如下影响

  1. 当线程池中的线程数达到corePoolSize后,新任务将在*队列中等待,因此线程池中的线程数不会超过corePoolSize。
  2. 使用*队列时maximumPoolSize、keepAliveTime将是无效参数。
  3. 由于使用*队列,运行中的FixedThreadPool(未执行方法shutdown()或shutdownNow())不会拒接任务。(不会调用RejectedExecutionHandler.rejectedExecution方法)

SingleThreadExecutor

SingleThreadExecutor是使用单个worker线程的Executor,下面是源代码实现

public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

SingleThreadExecutor的execute()方法运行示意图如下所示

并发编程的艺术之读书笔记(十六)

  1. 如果当前运行的线程数少于corePoolSize(<1,即线程池中无运行的线程),则创建新线程来执行任务。
  2. 在线程池完成预热之后(当前线程池中有一个运行的线程),将任务加入LinkedBlockingQueue。
  3. 当线程执行完1中的任务后,会在循环中反复从LinkedBlockingQueue获取任务来执行。

注意,SingleThreadExecutor也使用*队列LinkedBlockingQueue作为工作队列,对线程池带来的影响与FixedThreadPool相同。

CachedThreadPool

CachedThreadPool是一个会根据需要创建线程的线程池。源代码实现如下

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但CachedThreadPool的maximumPool是*的。这意味着,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程。极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。

CachedThreadExecutor的execute()方法运行示意图如下所示

并发编程的艺术之读书笔记(十六)

  1. 首先执行SynchronousQueue.offer(Runnable task)。如果当前maximumPool中有空闲线程正在执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程执行offer操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行,executor()方法执行完成;否则执行下面的步骤2.
  2. 当初始maximumPool为空,或者maximumPool中当前没有空闲线程时,将没有线程执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这种情况下步骤1将失败。此时CachedThreadPool会创建一个新线程执行任务,execute方法执行完成。
  3. 在步骤2中创建的线程将任务执行完后,会执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这个poll操作会让空闲线程最多在SynchronousQueue中等待60秒。如果60秒内主线程提交了一个新任务,那么这个空闲线程将执行主线程提交的新任务;否则,这个空闲线程将终止。由于空闲60秒的空闲线程会被终止,因此长时间保持空闲的CachedThreadPool不会使用任何资源。

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor继承自ThreadPoolExecutor。它主要用来在给定的延迟之后运行任务,或者定期执行任务。          ScheduledThreadPoolExecutor功能与Tiimer类似,但比Timer功能更强大。Timer对应的是单个后台线程,ScheduledThreadPoolExecutor可以在构造函数中指定多个对应的后台线程数。

Executors可以创建2种类型的ScheduledThreadPoolExecutor:

ScheduledThreadPoolExecutor:包含若干个线程(固定个数线程)的ScheduledThreadPoolExecutor。适用于需要多个后台线程执行执行周期任务,同时限制后台线程个数的应用场景。

ScheduledThreadPoolExecutor为了实现周期性的执行任务,对ThreadPoolExecutor做了如下修改:

  • 使用DelayQueue作为任务队列。
  • 获取任务的方式不同。
  • 执行周期任务后,增加了额外的处理。

DelayQueue是一个*队列,所以ThreadPoolExecutor的maximumPoolSize在ScheduledThreadPoolExecutor中没有什么意义。

ScheduledThreadPoolExecutor的任务传递示意图

并发编程的艺术之读书笔记(十六)

  1. 当调用ScheduledThreadPoolExecutor的 scheduleAtFixedRate() 方法或者 scheduleWithFixedDelay() 方法时,会向ScheduledThreadPoolExecutor的DelayQueue添加一个实现了RunnableScheduleFutur接口的ScheduledFutureTask。
  2. 线程池中的线程从DelayQueue中获取ScheduledFutureTask,然后执行任务。

4. FutureTask详解

 FutureTask除了实现Future接口外,还实现了Runnable接口。因此,FutureTask可以交给Executor执行,也可以由调用线程直接执行(FutureTask.run())。FutureTask有未启动、已启动、已完成3种状态。

FutureTask的状态迁移示意图如下:

并发编程的艺术之读书笔记(十六)

  1. 未启动。FutureTask.run()方法还没有被执行之前,FutureTask处于未启动状态。当创建一个FutureTask,且没有执行FutureTask.run()方法之前,这个FutureTask处于未启动状态。

  2. 已启动。FutureTask.run()方法被执行的过程中,FutureTask处于已启动状态。

  3. 已完成。FutureTask.run()方法执行完后正常结束,或被取消(FutureTask.cancel()),或FutureTask.run()时抛出异常而异常结束,FutureTask处于已完成状态。

FutureTask的使用

下面是FutureTask在不同状态时调用FutureTask.get()及Future.cancel()方法的执行示意图

并发编程的艺术之读书笔记(十六)

FutureTask基于AQS实现

总结

并发编程的艺术笔记到此结束