欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Executor的简单了解

程序员文章站 2022-06-17 10:46:21
...

线程池 Executor

成员

  • Executor: 是一个接口,它是Executor框架的基础,它将任务的提交和执行分离开
public interface Executor {
    void execute(Runnable command);
}

这个execute方法就行执行任务的方法。

  • ExcutorService:这个Service是个接口,ThreadPoolExecutor和ScheduledThreadpoolExecutor都是其子类。
public interface ExecutorService extends Executor {
    //关闭任务
    void shutdown();

    //尝试关闭所有任务
    List<Runnable> shutdownNow();
    
    //提交任务
    <T> Future<T> submit(Callable<T> task);
    
    //提交任务,并且接收回调
    <T> Future<T> submit(Runnable task, T result);
    //执行任务
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
}
  • ThreadPoolExecutor:是线程池的核心实现类,用来执行被提交的任务
  • ScheduledThreadpoolExecutor:是一个实现类,用来在指定的延迟时间后运行命令,或者定期执行命令。
  • Future接口和Future接口的实现类FutureTask:表示异步计算的结果
  • Runnable接口和Callable的实现类,都可以被ThreadPoolExecutor和ScheduledThreadpoolExecutor执行。

执行过程

  1. 主线程创建任务对象,也就是Runnable或Callable的实现类
  2. 主线程把任务对象交给ExcutorService执行。
  3. 通过ExcutorService返回的FetureTask对象,调用get方法等待任务执行完毕。当然,也可以通过cancel方法取消任务的执行。

创建线程池

Executor框架提供了三种不同的线程池:

  1. FixedThreadPool:表示创建使用固定线程数的线程池。
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

会限制当前线程数量的应用场景,适用于负载比较重得服务器。
2. SignleThreadExecutor创建使用单个线程的线程池。

 public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
  1. CachedThreadPool,会根据需要创建新线程的Executor。
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

这个是一个大小没有边界的线程池,适用于执行很多短期异步小任务。

ThreadPoolExecutor

主要成员

  • corePool: 核心线程池大小
  • maximumPool:最大线程池大小
  • BlockingQueue:暂时保存任务的工作队列
  • RejectedExecutionHandle:饱和时,execute方法调用的handler

针对于上面三种线程池,我们来看看其内部的执行过程。

FixedThreadPool

  1. 如果当前线程数小于corePoolSize,那么将会创建新的线程来执行任务
  2. 在线程池的运行线程与corePoolSize一样大时,将会将任务加入阻塞队列中。
  3. 之前创建出的线程执行完任务之后,将会循环的从阻塞队列中获取任务来执行。

因为是一个*的阻塞队列,所以maximumPool无效,并且不会有饱和的情况。

SignleThreadExecutor

和上面的FixedThreadPool流程基本一致,只不过是size被设置为1.

CachedThreadPool

CachedThreadPool的corePoolSize被设置为0,maximumPoolSize被设置为Int的最大值。也就是maximumPoll是*的,keepAliveTime设置的空闲存活时间为60s。一旦线程空闲超过60s,将会被终止。

FutureTask实现

FutureTask的三种状态

  • 未启动: 未调用run方法,调用get,导致线程阻塞,调用cancel,不会执行。
  • 启动:调用run方法,正在执行中,调用get,导致线程阻塞,调用cancel(false),无影响。调用cancel(true),中断线程
  • 已完成:正常结束、异常结束、cancel,导致线程立即返回结果或抛出异常。调用cancel,返回false

FutureTask的异步实现

是基于AbstractQueueSynchronizer,抽象同步队列,简称AQS,是一个同步框架,提供通用机制来原子性管理同步状态、阻塞和唤醒线程以及维护被阻塞线程的队列。实现类包括了可重入锁,信号量等。
每一个基于AQS实现的同步器至少会包含两种类型的操作。

  • acquire: 这个操作将阻塞线程,直到AQS允许这个线程继续执行,在FutureTask中,就是get操作
  • release:改变AQS状态,改变状态后,允许一个或多个阻塞线程被解除阻塞。在FutureTask中,是run方法和cancel方法。

FutureTask内部声明了一个AQS的子类Sync,所有的调用都会委托给Sync。

FutrureTask.get流程:
  • 内部会调用到AQS.acquireSharedInterruptibly方法,回调Sync实现的tryAcquireShared方法判断acquire操作是否可以成功。成功的条件为:state为执行完成状态或者已取消状态。
  • 如果成功则get立即返回。如果失败则到线程等待队列中去等待其他线程执行release操作。
  • 当其他线程执行了release操作并且唤醒当前线程后,再次执行tryAcquireShared将返回1.
  • 最后返回计算结果。
FutureTask.run流程
  • 执行构造函数指定的任务
  • 以CAS方式更新同步状态。
  • 调用releaseShared方法,唤醒等待队列的第一个线程。
  • 调用FutureTask.done