欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Executor框架整理(一)

程序员文章站 2022-06-13 20:50:23
...

前言

在Java中,使用线程来异步执行任务。
Java线程的创建与销毁需要一定的开销,如果我们为每一个任务创建一个新线程来执行,这些线程的创建与销毁将消耗大量的计算资源。
同时,为每一个任务创建一个新线程来执行,这种策略可能会使处于高负荷状态的应用最终崩溃。

一、什么是Executor框架?

我们知道线程池就是线程的集合,线程池集中管理线程,以实现线程的重用,降低资源消耗,提高响应速度等。线程用于执行异步任务,单个的线程既是工作单元也是执行机制,从JDK1.5开始,为了把工作单元与执行机制分离开,Executor框架诞生了,他是一个用于统一创建与运行的接口。Executor框架实现的就是线程池的功能
二、Executor框架结构

1、Executor框架包括3大部分:

(1)任务。也就是工作单元,包括被执行任务需要实现的接口:Runnable接口或者Callable接口;

(2)任务的执行。也就是把任务分派给多个线程的执行机制,包括Executor接口及继承自Executor接口的ExecutorService接口。Executor框架有两个关键类实现了ExecutorService接口(ThreadPoolExecutorScheduledThreadPoolExecutor)。

(3)异步计算的结果。包括Future接口及实现了Future接口的FutureTask类。

Executor框架的成员及其关系可以用一下的关系图表示

Executor框架整理(一)

下面是这些类和接口的简介:

    Executor是一个接口,它是Executor框架的基础,它将任务的提交与任务的执行分离开来。
    ThreadPoolExecutor 是线程池的核心实现类,用来执行被提交的任务。
    ScheduledThreadPoolExecutor 是一个实现类,可以在给定的延迟后运行命令,或者定期执行命令。ScheduledThreadPoolExecutor比Timer更灵活,功能更强大。
    Future接口和实现Future接口的FutureTask类,代表异步计算的结果。
    Runnable接口和Callable接口的实现类,都可以被ThreadPoolExecutor 或ScheduledThreadPoolExecutor执行。

2、Executor框架的使用示意图:

Executor框架整理(一)

使用步骤:

(1)创建Runnable并重写run()方法或者Callable对象并重写call()方法:

class callableTest implements Callable<String >{
            @Override
            public String call() {
                try{
                    String a = "return String";
                    return a;
                }
                catch(Exception e){
                    e.printStackTrace();
                    return "exception";
                }
            }
        }

(2)创建Executor接口的实现类ThreadPoolExecutor类或者ScheduledThreadPoolExecutor类的对象,然后调用其execute()方法或者submit()方法把工作任务添加到线程中,如果有返回值则返回Future对象。其中Callable对象有返回值,因此使用submit()方法;而Runnable可以使用execute()方法,此外还可以使用submit()方法,只要使用callable(Runnable task)或者callable(Runnable task,  Object result)方法把Runnable对象包装起来就可以,使用callable(Runnable task)方法返回的null,使用callable(Runnable task,  Object result)方法返回result。

ThreadPoolExecutor tpe = new ThreadPoolExecutor(5, 10,
                100, MILLISECONDS, new ArrayBlockingQueue<Runnable>(5));
Future<String> future = tpe.submit(new callableTest());

(3)调用Future对象的get()方法后的返回值,或者调用Future对象的cancel()方法取消当前线程的执行。最后关闭线程池

三、Executor框架成员:ThreadPoolExecutor实现类、ScheduledThreadPoolExecutor实现类、Future接口、Runnable和Callable接口、Executors工厂类

(一). ThreadPoolExecutor

是Executor接口的一个重要的实现类,是线程池的具体实现,用来执行被提交的任务。

通常使用工厂类Executors来创建。Executors可以创建3种类型的ThreadPoolExecutor

1.  FixedThreadPool:适用于为了满足资源管理的需求,而需要限制当前线程数量的应用场景,它适用于负载比较重的服务器。newFixedThreadPool创建一个可重用固定线程数的线程池,以共享的*队列方式来运行这些线程。

ExecutorService executorService = Executors.newFixedThreadPool(5);
        for (int i = 0; i < 20; i++) {
            Runnable syncRunnable = new Runnable() {
                @Override
                public void run() {
                    Log.e(TAG, Thread.currentThread().getName());
                }
            };
            executorService.execute(syncRunnable);
        }

运行结果:总共只会创建5个线程, 开始执行五个线程,当五个线程都处于活动状态,再次提交的任务都会加入队列等到其他线程运行结束,当线程处于空闲状态时会被下一个任务复用

下面是Executors提供的,创建使用固定线程数的FixedThreadPool的API:

public static ExecutorService newFixedThreadPool(int var0) {
    return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue());
}
public static ExecutorService newFixedThreadPool(int var0, ThreadFactory var1) {
    return new ThreadPoolExecutor(var0, var0, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var1);
}

FixedThreadPoolcorePoolSizemaximumPoolSize都被设置为创建FixedThreadPool时指定的参数nThreads

当线程池中的线程数大于corePoolSize时,keepAliveTime为多余的空闲线程等待新任务的最长时间,超过这个时间后多余的线程将被终止。这里把keepAliveTime设置为0L,意味着多余的空闲线程会被立即终止。

Executor框架整理(一)

  • 图中1:如果当前运行的线程数少于corePoolSize,则创建新线程来执行任务。
  • 图中2:在线程池完成预热之后(当前运行的线程数等于corePoolSize),将任务加入LinkedBlockingQueue。
  • 图中3:线程执行完1中的任务后,会在循环中反复从LinkedBlockingQueue获取任务来执行。

FixedThreadPool使用*队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为Integer.MAX_VALUE)。
使用*队列作为工作队列会对线程池带来如下影响:

  •     当线程池中的线程数达到corePoolSize后,新任务将在*队列中等待,因此线程池中的线程数不会超过corePoolSize。
  •     由于上一点,使用*队列时maximumPoolSize将是一个无效参数。
  •     由于前面两点,使用*队列时keepAliveTime将是一个无效参数。
  •     由于使用*队列,运行中的FixedThreadPool(未执行方法shutdown()或shutdownNow())不会拒绝任务(不会调用RejectedExecutionHandler.rejectedExecution方法)。

FixedThreadPool的工作流程大致如下:

  • 当前核心线程池总线程数量小于corePoolSize,那么创建线程并执行任务;
  • 如果当前线程数量等于corePoolSize,那么把 任务添加到阻塞队列中;
  • 如果线程池中的线程执行完任务,那么获取阻塞队列中的任务并执行;

2. SingleThreadExecutor:适用于需要保证顺序地执行各个任务;并且在任意时间点,不会有多个线程是活动的应用场景。

创建一个单线程化的Executor。它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行

ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 20; i++) {
            Runnable syncRunnable = new Runnable() {
                @Override
                public void run() {
                    Log.e(TAG, Thread.currentThread().getName());
                }
            };
            executorService.execute(syncRunnable);
        }

 运行结果:只会创建一个线程,当上一个执行完之后才会执行第二个

下面是Executors提供的,创建使用单个线程的SingleThreadExecutor的API。

public static ExecutorService newSingleThreadExecutor() {
    return new Executors.FinalizableDelegatedExecutorService(
            new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue())
    );
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory var0) {
    return new Executors.FinalizableDelegatedExecutorService(
            new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue(), var0)
    );
}

注意: SingleThreadExecutor的corePoolSize和maximumPoolSize被设置为1。其他参数与FixedThreadPool相同。      SingleThreadExecutor使用*队列LinkedBlockingQueue作为线程池的工作队列(队列的容量为Integer.MAX_VALUE)。SingleThreadExecutor使用*队列作为工作队列对线程池带来的影响与FixedThreadPool相同,这里就不赘述了。

SingleThreadPool的工作流程大概如下:

  • 当前核心线程池总线程数量小于corePoolSize(1),那么创建线程并执行任务;
  • 如果当前线程数量等于corePoolSize,那么把 任务添加到阻塞队列中;
  • 如果线程池中的线程执行完任务,那么获取阻塞队列中的任务并执行;

3. CachedThreadPool:是大小*的线程池,适用于执行很多的短期异步任务的小程序,或者是负载较轻的服务器。是一个会根据需要创建新线程的线程池。newCachedThreadPool创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程

下面是Executors提供的,创建一个会根据需要创建新线程的CachedThreadPool的API。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue());
}
public static ExecutorService newCachedThreadPool(ThreadFactory var0) {
    return new ThreadPoolExecutor(0, 2147483647, 60L, TimeUnit.SECONDS, new SynchronousQueue(), var0);
}

CachedThreadPool的corePoolSize被设置为0,即corePool为空;maximumPoolSize被设置为Integer.MAX_VALUE,即maximumPool是*的。这里把keepAliveTime设置为60L,意味着CachedThreadPool中的空闲线程等待新任务的最长时间为60秒,空闲线程超过60秒后将会被终止。
FixedThreadPool和SingleThreadExecutor使用*队列LinkedBlockingQueue作为线程池的工作队列。
CachedThreadPool使用没有容量的SynchronousQueue作为线程池的工作队列,但CachedThreadPool的maximumPool是*的。
这意味着,如果主线程提交任务的速度高于maximumPool中线程处理任务的速度时,CachedThreadPool会不断创建新线程。
极端情况下,CachedThreadPool会因为创建过多线程而耗尽CPU和内存资源。
Executor框架整理(一)

注释: 上图1: 首先执行SynchronousQueue.offer(Runnable task)。如果当前maximumPool中有空闲线程正在执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS),那么主线程执行offer操作与空闲线程执行的poll操作配对成功,主线程把任务交给空闲线程执行,execute()方法执行完成;否则执行下面的步骤2。
         上图2:当初始maximumPool为空,或者maximumPool中当前没有空闲线程时,将没有线程执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这种情况下,步骤1将失败。此时CachedThreadPool会创建一个新线程执行任务,execute()方法执行完成。
       上图3:.在步骤2中新创建的线程将任务执行完后,会执行SynchronousQueue.poll(keepAliveTime,TimeUnit.NANOSECONDS)。这个poll操作会让空闲线程最多在SynchronousQueue中等待60秒钟。如果60秒钟内主线程提交了一个新任务(主线程执行步骤1),那么这个空闲线程将执行主线程提交的新任务;否则,这个空闲线程将终止。由于空闲60秒的空闲线程会被终止,因此长时间保持空闲的CachedThreadPool不会使用任何资源。
前面提到过,SynchronousQueue是一个没有容量的阻塞队列。每个插入操作必须等待另一个线程的对应移除操作,反之亦然。CachedThreadPool使用SynchronousQueue,把主线程提交的任务传递给空闲线程执行。

ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 100; i++) {
            Runnable syncRunnable = new Runnable() {
                @Override
                public void run() {
                    Log.e(TAG, Thread.currentThread().getName());
                }
            };
            executorService.execute(syncRunnable);
        }

运行结果:可以看出缓存线程池大小是不定值,可以需要创建不同数量的线程,在使用缓存型池时,先查看池中有没有以前创建的线程,如果有,就复用.如果没有,就新建新的线程加入池中,缓存型池子通常用于执行一些生存期很短的异步型任务

CachedThreadPool的工作流程大概如下:

    首先执行SynchronizedQueue.offer(  )把任务提交给阻塞队列,如果这时候正好在线程池中有空闲的线程执行SynchronizedQueue.poll( ),那么offer操作和poll操作配对,线程执行任务;
    如果执行SynchronizedQueue.offer(  )把任务提交给阻塞队列时maximumPoolSize=0.或者没有空闲线程来执行SynchronizedQueue.poll( ),那么步骤1失败,那么创建一个新线程来执行任务;
    如果当前线程执行完任务则循环从阻塞队列中获取任务,如果当前队列中没有提交(offer)任务,那么线程等待keepAliveTime时间,在CacheThreadPool中为60秒,在keepAliveTime时间内如果有任务提交则获取并执行任务,如果没有则销毁线程,因此最后如果一直没有任务提交了,线程池中的线程数量最终为0。


(二)ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor类继承了ThreadPoolExecutor并实现了ScheduledExecutorService接口。主要用于在给定的延迟后执行任务或者定期执行任务。作用类似于java.util包下的Timer类,但是比Timer功能更强大、更灵活,因为Timer只能控制单个线程延迟或定期执行,而ScheduledThreadPoolExecutor对应的是多个线程的后台线程。

Executors可以创建2种类型的ScheduledThreadPoolExecutor,如下:

1. ScheduledThreadPoolExecutor。包含若干个线程的ScheduledThreadPoolExecutor
适用于需要多个后台线程执行周期任务,同时为了满足资源管理的需求而需要限制后台线程的数量的应用场景。

下面是工厂类Executors提供的,创建 固定个数 线程的ScheduledThreadPoolExecutor的API。

public static ScheduledExecutorService newScheduledThreadPool(int var0) {
    return new ScheduledThreadPoolExecutor(var0);
}
public static ScheduledExecutorService newScheduledThreadPool(int var0, ThreadFactory var1) {
    return new ScheduledThreadPoolExecutor(var0, var1);
}

 2. SingleThreadScheduledExecutor:适用于需要单个线程延时或者定期的执行任务,同时需要保证各个任务顺序执行的应用场景。

ScheduledExecutorService stse = Executors.newSingleThreadScheduledExecutor(int threadNums);
ScheduledExecutorService stp = Executors.newSingleThreadScheduledExecutor(int threadNums, ThreadFactory threadFactory);

 

ScheduledThreadPoolExecutor的运行机制

Executor框架整理(一)

DelayQueue是一个*队列,所以ThreadPoolExecutormaximumPoolSizeScheduledThreadPoolExecutor中没有什么意义(设置maximumPoolSize的大小没有什么效果)。

ScheduledThreadPoolExecutor的执行主要分为两大部分。

  •     当调用ScheduledThreadPoolExecutor的scheduleAtFixedRate()方法或者scheduleWithFixedDelay()方法时,会向ScheduledThreadPoolExecutor的DelayQueue添加一个实现了RunnableScheduledFutur接口的ScheduledFutureTask。
  •     线程池中的线程从DelayQueue中获取ScheduledFutureTask,然后执行任务。

ScheduledThreadPoolExecutor为了实现周期性的执行任务,对ThreadPoolExecutor做了如下的修改。

  •     使用DelayQueue作为任务队列。
  •     获取任务的方式不同(后文会说明)。
  •     执行周期任务后,增加了额外的处理(后文会说明)

(1)ScheduledThreadPoolExecutor的实现

通过查看源码,可以发现ScheduledThreadPoolExecutor的实现主要是通过把任务封装为ScheduledFutureTask来实现。ScheduledThreadPoolExecutor通过它的scheduledAtFixedTime()方法或者scheduledWithFixedDelay()方法向阻塞队列添加一个实现了RunnableScheduledFutureTask接口的ScheduledFutureTask类对象。

ScheduledFutureTask主要包含3个成员变量,如下。

  •     long型成员变量time,表示这个任务将要被执行的具体时间。
  •     long型成员变量sequenceNumber,表示这个任务被添加到ScheduledThreadPoolExecutor中的序号。
  •     long型成员变量period,表示任务执行的间隔周期。

DelayQueue封装了一个PriorityQueue,这个PriorityQueue会对队列中的ScheduledFutureTask进行排序。排序时,time小的排在前面(时间早的任务将被先执行)。如果两个ScheduledFutureTask的time相同,就比较sequenceNumber,sequenceNumber小的排在前面(也就是说,如果两个任务的执行时间相同,那么先提交的任务将被先执行)。
 Executor框架整理(一)

下面是对这4个步骤的说明:

    上图1: 线程·1从DelayQueue中获取已到期的ScheduledFutureTask 的 DelayQueue.take()。到期任务是指ScheduledFutureTask的time大于等于当前时间。
    上图2: 线程1执行这个ScheduledFutureTask。
    上图3: 线程1修改ScheduledFutureTask的time变量为下次将要被执行的时间。
    上图4: 线程1把这个修改time之后的ScheduledFutureTask放回DelayQueue中DelayQueue.add()。