Executor的简单了解
程序员文章站
2022-06-17 10:46:21
...
线程池 Executor
成员
- Executor: 是一个接口,它是Executor框架的基础,它将任务的提交和执行分离开
public interface Executor {
void execute(Runnable command);
}
这个execute方法就行执行任务的方法。
- ExcutorService:这个Service是个接口,ThreadPoolExecutor和ScheduledThreadpoolExecutor都是其子类。
public interface ExecutorService extends Executor {
//关闭任务
void shutdown();
//尝试关闭所有任务
List<Runnable> shutdownNow();
//提交任务
<T> Future<T> submit(Callable<T> task);
//提交任务,并且接收回调
<T> Future<T> submit(Runnable task, T result);
//执行任务
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
}
- ThreadPoolExecutor:是线程池的核心实现类,用来执行被提交的任务
- ScheduledThreadpoolExecutor:是一个实现类,用来在指定的延迟时间后运行命令,或者定期执行命令。
- Future接口和Future接口的实现类FutureTask:表示异步计算的结果
- Runnable接口和Callable的实现类,都可以被ThreadPoolExecutor和ScheduledThreadpoolExecutor执行。
执行过程
- 主线程创建任务对象,也就是Runnable或Callable的实现类
- 主线程把任务对象交给ExcutorService执行。
- 通过ExcutorService返回的FetureTask对象,调用get方法等待任务执行完毕。当然,也可以通过cancel方法取消任务的执行。
创建线程池
Executor框架提供了三种不同的线程池:
- FixedThreadPool:表示创建使用固定线程数的线程池。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
会限制当前线程数量的应用场景,适用于负载比较重得服务器。
2. SignleThreadExecutor创建使用单个线程的线程池。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
- CachedThreadPool,会根据需要创建新线程的Executor。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
这个是一个大小没有边界的线程池,适用于执行很多短期异步小任务。
ThreadPoolExecutor
主要成员
- corePool: 核心线程池大小
- maximumPool:最大线程池大小
- BlockingQueue:暂时保存任务的工作队列
- RejectedExecutionHandle:饱和时,execute方法调用的handler
针对于上面三种线程池,我们来看看其内部的执行过程。
FixedThreadPool
- 如果当前线程数小于corePoolSize,那么将会创建新的线程来执行任务
- 在线程池的运行线程与corePoolSize一样大时,将会将任务加入阻塞队列中。
- 之前创建出的线程执行完任务之后,将会循环的从阻塞队列中获取任务来执行。
因为是一个*的阻塞队列,所以maximumPool无效,并且不会有饱和的情况。
SignleThreadExecutor
和上面的FixedThreadPool流程基本一致,只不过是size被设置为1.
CachedThreadPool
CachedThreadPool的corePoolSize被设置为0,maximumPoolSize被设置为Int的最大值。也就是maximumPoll是*的,keepAliveTime设置的空闲存活时间为60s。一旦线程空闲超过60s,将会被终止。
FutureTask实现
FutureTask的三种状态
- 未启动: 未调用run方法,调用get,导致线程阻塞,调用cancel,不会执行。
- 启动:调用run方法,正在执行中,调用get,导致线程阻塞,调用cancel(false),无影响。调用cancel(true),中断线程
- 已完成:正常结束、异常结束、cancel,导致线程立即返回结果或抛出异常。调用cancel,返回false
FutureTask的异步实现
是基于AbstractQueueSynchronizer,抽象同步队列,简称AQS,是一个同步框架,提供通用机制来原子性管理同步状态、阻塞和唤醒线程以及维护被阻塞线程的队列。实现类包括了可重入锁,信号量等。
每一个基于AQS实现的同步器至少会包含两种类型的操作。
- acquire: 这个操作将阻塞线程,直到AQS允许这个线程继续执行,在FutureTask中,就是get操作
- release:改变AQS状态,改变状态后,允许一个或多个阻塞线程被解除阻塞。在FutureTask中,是run方法和cancel方法。
FutureTask内部声明了一个AQS的子类Sync,所有的调用都会委托给Sync。
FutrureTask.get流程:
- 内部会调用到AQS.acquireSharedInterruptibly方法,回调Sync实现的tryAcquireShared方法判断acquire操作是否可以成功。成功的条件为:state为执行完成状态或者已取消状态。
- 如果成功则get立即返回。如果失败则到线程等待队列中去等待其他线程执行release操作。
- 当其他线程执行了release操作并且唤醒当前线程后,再次执行tryAcquireShared将返回1.
- 最后返回计算结果。
FutureTask.run流程
- 执行构造函数指定的任务
- 以CAS方式更新同步状态。
- 调用releaseShared方法,唤醒等待队列的第一个线程。
- 调用FutureTask.done
上一篇: php程序怎调试?