源码分析:线程池ThreadPoolExecutor与Executors
//TODO 传统线程创建的缺点,futureTask Callable
ThreadPoolExecutor
看一下ThreadPoolExecutor构造函数的源码
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue){
this(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,
Executors.defaultThreadFactory(),defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory){
this(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,
threadFactory,defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler){
this(corePoolSize,maximumPoolSize,keepAliveTime,unit,workQueue,
Executors.defaultThreadFactory(),handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler){
if(corePoolSize< 0||
maximumPoolSize<=0||
maximumPoolSize<corePoolSize ||
keepAliveTime< 0)
throw new IllegalArgumentException();
if(workQueue==null||threadFactory==null||handler==null)
throw new NullPointerException();
this.acc=System.getSecurityManager()==null?
null:
AccessController.getContext();
this.corePoolSize=corePoolSize;
this.maximumPoolSize=maximumPoolSize;
this.workQueue=workQueue;
this.keepAliveTime=unit.toNanos(keepAliveTime);
this.threadFactory=threadFactory;
this.handler=handler;
}
通过构造器源码可以看出
- 共有4个构造器
- 前三个构造器都是调用最后一个
分析一下参数
corePoolSize
核心线程数量,线程池刚创建时,线程数量为0,当每次执行execute添加新的任务时会在线程池创建一个新的线程,直到线程数量达到corePoolSize为止。
maximumPoolSize最大线程数量,当workQueue队列已满,放不下新的任务,再通过execute添加新的任务则线程池会再创建新的线程,线程数量大于corePoolSize但不会超过maximumPoolSize,如果超过maximumPoolSize,那么会抛出异常。
keepAliveTime
当线程池中线程数量大于workQueue,如果一个线程的空闲时间大于keepAliveTime,则该线程会被销毁。
unit
unit是keepAliveTime的时间单位。
当线程池正在运行的线程数量已经达到corePoolSize,那么再通过execute添加新的任务则会被加到workQueue队列中,在队列中排队等待执行,而不会立即执行。
方法
线程池状态
// runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS;ThreadPoolExecutor 中为线程池定义了五种状态:
- RUNNING:正常状态,接受新的任务,并处理任务队列中的任务
- SHUTDOWN:不接受新的任务,但是处理已经在任务队列中的任务
- STOP: 不接受新的任务,也不处理已经在任务队列中的任务,同时会尝试停止正在执行任务的线程
- TIDYING: 线程池和任务队列都为空,该状态下线程会执行 terminated() 方法
- TERMINATED:terminated() 方法执行完毕
- RUNNING -> SHUTDOWN:调用了 shutdown() 方法 (perhaps implicitly in finalize())
- (RUNNING or SHUTDOWN) -> STOP:调用了shutdownNow() 方法
- SHUTDOWN -> TIDYING:线程池和任务队列都为空
- STOP -> TIDYING:线程池为空
- TIDYING -> TERMINATED:执行完 terminated() 方法
排队策略
在前面我们多次提到了任务缓存队列,即workQueue,它用来存放等待执行的任务。workQueue的类型为BlockingQueue<Runnable>,通常可以取下面三种类型:
- ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;
- LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
- SynchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
拒绝策略
当线程池的任务缓存队列已满并且线程池中的线程数目达到maximumPoolSize时,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:
- DiscardOldestPolicy 丢弃掉队列中最老的任务。
- CallerRunsPolicy 则将任务返回给提交任务的线程处理。
- DiscardPolicy 则丢弃任务。
- AbortPolicy 则抛出异常。
execute源码分析
public void execute(Runnable command) { // command 不能为 null if (command == null) throw new NullPointerException(); int c = ctl.get(); // 如果当前数量小于corePoolSize,会创建一个新的线程执行该任务 if (workerCountOf(c) < corePoolSize) { // true表示当前添加的线程为核心线程 if (addWorker(command, true)) return; c = ctl.get(); } // 线程数量大于等于 corePoolSize,首先尝试将任务添加到任务队列 // workQueue.offer 会将任务添加到队列尾部 if (isRunning(c) && workQueue.offer(command)) { // 重新检查状态 int recheck = ctl.get(); // 如果发现当前线程不是Running状态,就移除之前的任务 // 移除任务过程中有锁保护 if (! isRunning(recheck) && remove(command)) reject(command); // 如果工作线程都down了 else if (workerCountOf(recheck) == 0) // workerCountOf 用来统计当前的工作线程数量,程序执行到这里,有下面两种情况 // 1.当前线程池处于 Running 状态,但是工作线程数量为0 // 需要创建新的线程 // 2.移除任务失败,但是工作线程数量为0 // 需要创建新的线程来完成移除失败的任务 // // 因为前面对任务做了判断,所以正常情况下向 addWorker 里传入的任务 // 不可能为 null,这里传入null 是告诉 addWorker 需要创建新的线程, // 在addWorker里对null有专门的处理 addWorker(null, false); } // 下面的else说明线程池不是Running状态或者队列满了 else if (!addWorker(command, false)) // 这里说明线程池不是Running状态或者线程池饱和了 reject(command); }
线程池的关闭
ThreadPoolExecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownNow(),其中:- shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务,执行后线程处于SHUTDOWN状态
- shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务,执行后线程处于STOP状态
结束状态
一旦shutdown()或者shutdownNow()执行完毕,线程池就进入TERMINATED状态,即线程池就结束了。
- isTerminating() 如果关闭后所有任务都已完成,则返回 true。
- isShutdown() 如果此执行程序已关闭,则返回 true。
实例操作
import java.util.concurrent.*;
public class ThreadPool {
public static void main(String[] args) {
ThreadPoolExecutor executor = new ThreadPoolExecutor(
5, 10, 200, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(5));
// 这里的15是5+10,不能大于这个数
for (int i = 0; i < 15; i++) {
MyTask myTask = new MyTask(i);
executor.execute(myTask);
System.out.println("线程池中线程数目:" + executor.getPoolSize() +
",队列中等待执行的任务数目:" + executor.getQueue().size() +
",已执行完别的任务数目:" + executor.getCompletedTaskCount());
}
executor.shutdown();
}
}
class MyTask implements Runnable {
private int taskNum;
public MyTask(int num) {
this.taskNum = num;
}
@Override
public void run() {
System.out.println("正在执行task " + taskNum);
try {
Thread.currentThread().sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("task " + taskNum + "执行完毕");
}
}
执行结果
正在执行task 0
线程池中线程数目:1,队列中等待执行的任务数目:0,已执行完别的任务数目:0
线程池中线程数目:2,队列中等待执行的任务数目:0,已执行完别的任务数目:0
正在执行task 1
线程池中线程数目:3,队列中等待执行的任务数目:0,已执行完别的任务数目:0
正在执行task 2
线程池中线程数目:4,队列中等待执行的任务数目:0,已执行完别的任务数目:0
正在执行task 3
线程池中线程数目:5,队列中等待执行的任务数目:0,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:1,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:2,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:3,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:4,已执行完别的任务数目:0
线程池中线程数目:5,队列中等待执行的任务数目:5,已执行完别的任务数目:0
正在执行task 4
线程池中线程数目:6,队列中等待执行的任务数目:5,已执行完别的任务数目:0
线程池中线程数目:7,队列中等待执行的任务数目:5,已执行完别的任务数目:0
线程池中线程数目:8,队列中等待执行的任务数目:5,已执行完别的任务数目:0
正在执行task 12
正在执行task 10
正在执行task 13
正在执行task 11
线程池中线程数目:9,队列中等待执行的任务数目:5,已执行完别的任务数目:0
线程池中线程数目:10,队列中等待执行的任务数目:5,已执行完别的任务数目:0
正在执行task 14
task 0执行完毕
正在执行task 5
task 1执行完毕
task 3执行完毕
正在执行task 6
task 2执行完毕
正在执行task 7
正在执行task 8
task 4执行完毕
正在执行task 9
task 13执行完毕
task 12执行完毕
task 10执行完毕
task 11执行完毕
task 14执行完毕
task 5执行完毕
task 8执行完毕
task 6执行完毕
task 7执行完毕
task 9执行完毕
Process finished with exit code 0
从执行结果可以看出,当线程池中线程的数目大于corePoolSize时,便将任务放入任务缓存队列里面,当缓存队列满了之后,便创建新的线程。
总结一下线程池添加任务的整个流程:
- 线程池刚刚创建是,线程数量为0
- 执行execute添加新的任务时会在线程池创建一个新的线程
- 当线程数量达到corePoolSize且没有超过maximumPoolSize时,再添加新任务则会将任务放到workQueue队列
- 当缓存队列数量也到达workQueue之后,再添加新任务则会继续创建新线程,但线程数量不超过maximumPoolSize
- 当线程数量达到maximumPoolSize时,再添加新任务则会抛出异常
但是Java doc不推荐直接只用ThreadPoolExecutor,推荐使用Executors
Executors
newCachedThreadPool
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
创建一个可缓存线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程。
newFixedThreadPool
创建一个定长线程池,可控制线程最大并发数,超出的线程会在队列中等待。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
newSingleThreadExecutor
创建一个单线程化的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO, LIFO, 优先级)执行。
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); }
可以看出Executors类还是调用的ThreadPoolExecutor来实现的线程池。
一般来说,CachedTheadPool在程序执行过程中通常会创建与所需数量相同的线程,然后在它回收旧线程时停止创建新线程,因此它是合理的Executor的首选,只有当这种方式会引发问题时(比如需要大量长时间面向连接的线程时),才需要考虑用FixedThreadPool。一般来说,CachedTheadPool在程序执行过程中通常会创建与所需数量相同的线程,然后在它回收旧线程时停止创建新线程,因此它是合理的Executor的首选,只有当这种方式会引发问题时(比如需要大量长时间面向连接的线程时),才需要考虑用FixedThreadPool。
实例操作
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class ThreadPool {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
// ExecutorService executorService = Executors.newFixedThreadPool(3);
// ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 10; i++) {
executorService.execute(new TestRunnable());
System.out.println("************* a" + i + " *************");
}
executorService.shutdown();
}
}
class TestRunnable implements Runnable {
public void run() {
System.out.println(Thread.currentThread().getName() + "线程被调用了。");
}
}
----------
上一篇: 你听说过PHP 的面向方面编程吗?
下一篇: java高并发处理
推荐阅读