JAVA 线程池源码阅读与分析
线程池是什么
提前创建好若干个线程,如果有任务需要处理,这些创建好的线程就会开始处理任务,处理完之后线程并不会被销毁而是被保存起来,而是等待下一个任务。
为什么需要线程池
创建和销毁线程都是消耗系统资源的。
线程模型又分为用户线程,与内核线程。
- 用户线程ULT
用户程序实现的,不依赖系统核心,应用自己管控线程,不需要进行用户态/内核态的切换。操作系统内核无感知。 - 内核线程KLT
系统内核管控,内核保存上下文状态和信息,多处理器情况上能并发运行,线程的创建于同步由内核完成,能够调度CPU资源。
一个小测试 测试Java的线程类型为内核级线程还是用户级线程
public class ThreadTest {
public static void main(String[] args) {
for(int i=0 ; i <300 ;i++){
Thread thread=new Thread(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + "正在执行任务");
while (true){
}
}
});
thread.start();
}
}
}
创建前与创建后对比,线程可以被操作系统感知,可以得知Java的线程类型为内核级
JAVA线程与系统内核线程关系
JAVA线程创建时依赖于系统内核,通过JVM调用系统库创建内核线程,内核线程与JAVA-Thread是1:1映射关系。
线程的创建销毁调度将会涉及到线程的用户态/内核态的切换,当一时间任务量较大时并且任务量较小时,就会发生大量线程的创建和销毁会消耗大量的资源,导致降低系统效率和稳定性。
线程池的优点
为了避免以上的问题发生,就需要设法重用线程,使用线程池来缓存线程,重用线程,减少线程的创建与销毁,降低资源的无意义消耗。
提高响应速度,任务到达时直接从线程池中获取线程执行任务,而不需要去等待线程的创建。
提高线程可管理性,统一管理线程的分配。
线程池的创建与简单使用
线程池的工作原理
- 线程池判断当前运行任务的线程数量是否小于核心线程数,如果小于则创建核心线程处理任务
- 如果不小于,任务则会进入阻塞队列。
- 如果入队失败(队满),则会先尝试创建临时线程处理任务。
- 如果创建临时线程失败(核心线程+临时线程>最大线程数)则会触发拒绝策略。
线程池生命周期
- RUNNING:线程池一旦被创建,就处于 RUNNING 状态。
- SHUTDOWN:不接收新任务,但依旧处理已排队的任务。调用线程池的 shutdown() 方法。
- STOP:不接收新任务,不处理已排队的任务,并且会中断正在处理的任务。
- TIDYING:任务数为 0, 其他所有任务已终止,线程池会变为 TIDYING 状态。
- TERMINATED:线程池彻底终止。线程池在 TIDYING 状态执行完 terminated() 方法,该方法没有实现,留给开发者自己实现。
继承结构
- Executor : 是最顶层接口,只有一个execute()方法
- ExecutorService :添加了submit(),shutdown(),shutdownNow()等方法用来控制线程池的状态。
- AbstractExecutorService
- ThreadPoolExecuto 线程池的核心类,几乎所有与线程池有关的逻辑都封装在这个类里边。
DiscardPolicy、DiscardOldestPolicy、AbortPolicy、CallerRunsPolicy : 四种拒绝策略。
abortPolicy 抛弃任务,抛出RejectedExecutionException异常
DiscardPolicy(默认) 抛弃任务,也不抛出异常
DiscardOldestPolicy 丢弃队列中队首任务,重新尝试执行任务
CallerRunsPolicy 调用者回退机制 让Runnable自身执行 - Worker : 线程池中的线程类,实现了Runnable接口,继承了AQS(AbstractQueuedSynchronizer)类。
构造函数
使用给定的初始参数创建一个新的ThreadPoolExecutor。
ExecutorService pool = new ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler)
-
corePoolSize
核心线程数,即使在空闲时也要保留在池中的线程数 -
maximumPoolSize
当核心线程都在忙碌,任务量数较大时,新建临时线程+核心线最大线程数 -
keepAliveTime
多余的空闲线程最大空闲时间,到达时间后线程将被销毁 -
unit
时间单位 -
workQueue
当线程都在忙碌时将任务保留于队列中。该队列只能存放了实现了Runnable接口的对象 -
threadFactory
创建新线程时要使用的工厂 -
handler
当workQueue满时的拒绝策略,JDK本身提供了四种拒绝策略,自己也可以通过实现RejectedExecutionHandler自定义拒绝策略
7.1 abortPolicy 抛弃任务,抛出RejectedExecutionException异常
7.2 DiscardPolicy(默认) 抛弃任务,也不抛出异常
7.3 DiscardOldestPolicy 丢弃队列中队首任务,重新尝试执行任务
7.4 CallerRunsPolicy 调用者回退机制 让Runnable自身执行
核心属性
- AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
很重要的一个属性,低29位表示线程池中处于RUNNING状态的线程个数,高3位表示线程池所处的状态()。
private static int ctlOf(int rs, int wc) {
return rs | wc;
}
初始值为 RUNNING:1110 0000 0000 0000 0000 0000 0000 0000(-10 0000 0000 0000 0000 0000 0000 0000:-536870912),无工作线程,状态为运行中111(-1)
- int COUNT_BITS = Integer.SIZE - 3;
COUNT_BITS表示多少位数表示线程个数,值为29(32-3)。
- int CAPACITY = (1 << COUNT_BITS) - 1;
1左移动29位,再减1
1: 0000 0000 0000 0000 0000 0000 0000 0001
1左移29位: 0010 0000 0000 0000 0000 0000 0000 0000
减1: 0001 1111 1111 1111 1111 1111 1111 1111
用于& ctl 运算得到线程个数,&后相当于去除ctl前三位,余下29位,得到线程的个数
- int RUNNING = -1 << COUNT_BITS;
负1左移动29位
-1 的补码 是 1 1111 1111 1111 1111 1111 1111 1111
左移29位: 1110 0000 0000 0000 0000 0000 0000 0000
表示运行状态,注意与CAPACITY的用处不同
代表值-1
- int SHUTDOWN = 0 << COUNT_BITS;
线程池的关闭态,不接受新任务,依旧处理队列中的任务,
值:0000 0000 0000 0000 0000 0000 0000 0000
代表值:0
- int STOP = 1 << COUNT_BITS;
线程池的停止态,不接受新任务,不处理队列中任务,且打断运行中任务
值: 0010 0000 0000 0000 0000 0000 0000 0000
代表值:1 (表示线程池的状态只取前3位)
- int TIDYING = 2 << COUNT_BITS;
任务数为 0, 其他所有任务已终止,将要执行terminated()方法,
值:0100 0000 0000 0000 0000 0000 0000 0000
代表值:2
- int TERMINATED = 3 << COUNT_BITS;
线程池彻底终止
值:0110 0000 0000 0000 0000 0000 0000 0000
代表值:3
RUNNING ->SHUTDOWN->STOP->TIDYING->TERMINATED
-1->0->1->2->3
- HashSet workers = new HashSet();
存放线程池中的所有工作线程的集合。只有在获取mainLock时才可操作
- private volatile ThreadFactory threadFactory;
创建线程的工厂(初始化时候可以指定)
- private volatile RejectedExecutionHandler handler;
拒绝策略(初始化时候可以指定)
- private volatile long keepAliveTime;
多余的空闲线程最大空闲时间,到达时间后线程将被销毁(初始化时候可以指定)
- private volatile boolean allowCoreThreadTimeOut;
是否允许核心线程过期的标志,可以通过public void allowCoreThreadTimeOut(boolean value)方法设置策略,这样核心线程也会在一定情况下过期
- private volatile int corePoolSize;
核心线程个数
- private volatile int maximumPoolSize;
最大线程数
- private static final RejectedExecutionHandler defaultHandler = new
AbortPolicy();
默认的拒绝策略,抛弃任务,抛出RejectedExecutionException异常
核心方法public void execute(Runnable command)
先分析主干流程,后面再进入方法分析。
- 线程池判断当前运行任务的线程数量是否小于核心线程数,如果小于则创建核心线程处理任务
- 不小于,任务则会进入阻塞队列。
- 入队成功,入队成功后,如果线程池在Runing状态,或者线程不在Runing状态但是任务移除失败时为防止队列中有任务但是没线程执行的情况发生,用于增加工作线程。
- 入队失败(队满),则会先尝试创建临时线程处理任务。
- 如果创建临时线程失败(核心线程+临时线程>最大线程数)则会触发拒绝策略。
public void execute(Runnable command) {
//判断任务非空
if (command == null)
throw new NullPointerException();
//c前3位代表线程池状态,后29位代表线程池中运行的线程个数
int c = ctl.get();
//判断正在工作的线程个数是否小于核心线程数
if (workerCountOf(c) < corePoolSize) {
//增加工作线程,第二参数代表是增加核心线程还是临时线程,核心为true,临时为false
//增加成功直接返回
if (addWorker(command, true))
return;
//失败重新获取ctl的值
c = ctl.get();
}
//判断一下线程池是否还在运行
//在运行将任务入队列
if (isRunning(c) && workQueue.offer(command)) {
//入队成功后再次检查线程池状态为运行中
int recheck = ctl.get();
//不在运行中,移除任务,触发拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
//入队成功后,如果线程池在Runing状态,或者线程不在Runing状态,任务移除失败时
//为防止队列中有任务但是没线程执行的情况发生,用于增加线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//入队失败,说明队列满了,增加临时线程处理任务
//coreSize<当前线程数<maxSize的策略添加Worker。
//如果添加失败,则执行拒绝策略。
else if (!addWorker(command, false))
reject(command);
}
上文介绍了CAPACITY:0001 1111 1111 1111 1111 1111 1111 1111
任何数&CAPACITY:舍弃前3位,得到后29位的值。ctl的后29位代表线程的运行个数
private static int workerCountOf(int c) {
return c & CAPACITY;
}
SHUTDOWN:0
ctl运行中前面的状态值为111,为负数。判断线程池依旧运行中
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
核心方法private boolean addWorker(Runnable firstTask, boolean core)
该方法在大体上可以分为两部分
- 增加线程池中线程数量
- 增加工作线程
增加线程池中线程数量
- 先用retry标记外层循环,方便在内层循环中进行外层循环的break,与continue操作。
- 获取线程池的状态
- 检查线程池是否在运行中是否满足线程的创建与任务的添加工作
- 判断线程数是否已经超出范围,如果添加的是核心线程就与核心线程数比较,否则与最大线程比较
- CAS自增ctl值,运行中的线程数量加一
- CAS成功则结束整个循环
- CAS失败,检查一下线程池状态是否发生了改变,发生改变则跳到外层循环继续执行。否则说明CAS由于workerCount线程数量被更改而失败,内部循环重试增加数量即可。
//①
retry:
for (;;) {
int c = ctl.get();
//②
int rs = runStateOf(c);
//③
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
//④
int wc = workerCountOf(c);
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//⑤
if (compareAndIncrementWorkerCount(c))
break retry;
//⑦
c = ctl.get();
if (runStateOf(c) != rs)
continue retry;
}
}
增加工作线程
- 创建真正的工作线程Worker,并且传递任务
- 加锁修改将工作线程添加到集合中缓存起来
- 判断是否需要修改线程池中曾经同时存在的最大线程数
- 启动Worker线程开始工作
boolean workerStarted = false;
boolean workerAdded = false;
//①
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
//②
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
//③
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
//④
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
addWorker完整注释
private boolean addWorker(Runnable firstTask, boolean core) {
/***接下来整块代码都是在做一件事,判断是否可以新增线程,可以的话将线程数更新,否则退出***/
retry:
for (;;) {
int c = ctl.get();
//获取线程池的状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
//检查线程池是否在运行中
//小于SHUTDOWN则一定子啊运行中,继续往下
//如果大于等于SHUTDOWN(0),那么就有可能是SHUTDOWN->STOP->TIDYING->TERMINATED
//内部任何一个为假就添加失败
//rs不等于SHUTDOWN状态,那么剩余的STOP->TIDYING->TERMINATED状态都不再接受任何任务
//为SHUTDOWN状态,也只接受空的任务(什么时候任务为空)
//任务为空的并且workQueue不能为空才会继续处理这个空任务
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
//内层循环
for (;;) {
//运行中的线程数量
int wc = workerCountOf(c);
//CAPACITY: 0001 1111 1111 1111 1111 1111 1111 1111
//线程数大于最大值添加失败
//如果添加的是核心线程就与核心线程比较比较添加的核心线程是否超过规定值,否则与最大线程比较判断临时线程是否超量,
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//数量不超过,添加线程环境
//CAS设置线程数量+1
if (compareAndIncrementWorkerCount(c))
//自增成功跳过retry标记的循环(即外层循环)
break retry;
//CAS失败了,说明有其他线程修改了值(可以是线程数量,也可以是线程池状态)
//重新获取ctl的值
c = ctl.get();
//检查一下线程池状态是否发生了改变,如果发生改变则跳到外层循环继续执行,需要再次判断线程池的状态
if (runStateOf(c) != rs)
continue retry;
//线程池状态一致,说明CAS由于workerCount线程数量被更改而失败,内部循环重试增加数量即可
}
}
/***那说明ctl自增操作成功。接下来开始添加新任务********/
//线程是否开始的标志
boolean workerStarted = false;
//线程被添加到set缓存起来的标志
boolean workerAdded = false;
//真正的工作线程,后面在看,先记着是个工作线程
Worker w = null;
try {
//用任务初始化工作线程
w = new Worker(firstTask);
//获取其中的线程
final Thread t = w.thread;
if (t != null) {
//加锁
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
//try的意义在于finally释放锁
try {
//再次检查线程池状态
int rs = runStateOf(ctl.get());
//只有处于运行状态,或者SHUTDOWN并且任务为空才可以添加
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//这个看到worker中再回头看
if (t.isAlive())
throw new IllegalThreadStateException();
//添加到缓存线程的容器中,获取了锁才能add
workers.add(w);
//记录线程池中曾经同时存在的最大线程数。
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
//添加标志位置为true
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
//开启线程
t.start();
//线程开启标志
workerStarted = true;
}
}
} finally {
if (! workerStarted)
//回滚工作线程的创建
addWorkerFailed(w);
}
return workerStarted;
}
核心内部类Worker
- Worker : 线程池中的线程类,实现了Runnable接口,继承了AQS(AbstractQueuedSynchronizer)类。很明显Worker也是线程类。
Worker内部结构
初始化Worker
在核心方法addWorker(Runnable firstTask, boolean core)中添加任务,并且执行任务 代码块中初始化了Worker
Worker w = null;
w = new Worker(firstTask);
Worker(Runnable firstTask) {
//继承了AQS将state(volatile)设置为-1
//禁止中断,直到runWorker
setState(-1);
//将任务赋值
this.firstTask = firstTask;
//使用自身(实现了Runnable接口)从工厂中获取线程
this.thread = getThreadFactory().newThread(this);
}
运行Worker
调用t.start();后线程运行其中的RUN方法
//获取其中的线程
final Thread t = w.thread;
//运行线程
t.start();
public void run() {
runWorker(this);
}
看个简化版本的伪代码
- 在调用完Thread.start();方法后即启动了线程,
- 循环中判断firstTask任务是否为空,不为空则执行该任务(execute方法中传递的任务),如果这个任务为空则去阻塞队列中获取任务再执行获取到的任务
- 线程中是个循环,当getTask()获取不到元素时退出循环,线程结束
- 得到任务后,执行run方法
final void runWorker(Worker w) {
//获取worker中的firstTask
Runnable task = w.firstTask;
w.firstTask = null;
try {
//如果task不为NULL说明该线程是第一次循环
//如果task不为NULL说明已经不是该线程第一次执行了,则会去阻塞队列中获取任务
while (task != null || (task = getTask()) != null) {
task.run();
task = null;
}
} finally {
//清除退出
processWorkerExit(w, completedAbruptly);
}
}
看到这里目前是不是没有看到临时线程过期超过keepAliveTime时线程被回收的情况。 从while循环中可以看出只有当getTask()获取到的task为Null线程才会结束。
getTask()方法
什么情况下会返回NULL结束线程
- 线程池的状态为SHUTDOWN时,并且workQueue为空返回NULL(SHUTDOWN时依旧处理队列中的任务,所以要队列中没元素才返回Null)
- 线程池状态为STOP->TIDYING->TERMINATED 返回NULL(这三个状态下阻塞队列中的任务不执行)
- 当前线程数量大于最大线程,并且当前线程数量大于1
- 当前线程大于最大线程,工作队列为空
- 线程过期了并且线程被允许过期,并且当前线程数量大于1
- 线程过期了并且线程被允许过期,并且工作队列为空
首先线程被允许过期:策略允许核心线程过期,既然连核心线程都被允许过期了那么所有的线程都被允许过期。线程数量超过核心线程数,那么当前线程也是被允许过期的。
其次要线程过期:什么情况下线程才会过期呢,当前线程是被允许过期的(代码中timed标志),并且 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS),即工作队列在等待keepAliveTime时间后还是没有任务,那么线程就会标识为过期(timed = true);
再说下为什么要判断当前线程数量大于1工作队列为空:如果在进入下次循环时,又有任务进到工作队列中,把所有线程都过期了就处理不了任务了。所以需要判断一下队列为空或者尚存的工作队列大于1才会让线程结束
private Runnable getTask() {
//标志当前线程是否已经过期,到现在位置你是不是还没看到临时线程在哪里会过期
//这个标志就是标志线程该过期销毁了,一开始为false
boolean timedOut = false;
//后续需要CAS递减ctl的workerCount字段
//当没有从阻塞队列获取到任务时需要再执行该方法,返回Null,具体往后看
for (;;) {
//检查线程池的状态
//线程池的状态为SHUTDOWN时,并且workQueue为空返回NULL(SHUTDOWN时依旧处理队列中的任务,所以要队列中没元素才返回Null)
//为STOP->TIDYING->TERMINATED 返回NULL(这三个状态下阻塞队列中的任务不执行)
int c = ctl.get();
int rs = runStateOf(c);
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//减少ctl的workerCount数量。返回NULL后while循环就结束,线程就也结束了
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
//线程池策略模式,允许核心线程过期。
//如果不允许,那么只有当前线程量大于核心线程量时,线程才会被允许过期
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//1当前线程大于最大线程
// 1.1 wc>1
// 1.2 工作队列为空
// 2.线程过期并且允许过期(策略允许,当前线程数大于核心线程数,存在临时线程)
// 2.1 wc>1
// 2.2 工作队列为空
//以上四种情况任意一种发生就会返回Null
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//尝试CAS递减ctl的workerCount字段。返回Null线程结束
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
//如果时允许过去的采用poll方法获取任务(获得队首元素,并且移除,如果一时获取不到则等待指定的时间),
//不允许过期的采用take方法获取任务(获得队首元素,并且移除),take方法如果队列为空会一直阻塞到队列中有任务
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
//超过keepAliveTime时间没获得任务,线程过期了,下次循环检查线程过期时就会返回Null结束线程
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
总结
通过上文介绍的工作原理进行总结
线程池判断当前运行任务的线程数量是否小于核心线程数,如果小于则创建核心线程处理任务
1.如果小于核心线程数就会调用addWorker(command, true),这个true就是代表核心线程,在创建的时会用这个true判断核心线程 是否超过数量,在队列满时又进来任务时会调用addWorker(command, false),在创建的时会用这个false来判断线程是否超过最大数量。
2.在addWorker中会将运行中线程数量自增1,再创建一个worker对象(实现Runable接口),worker内部聚合一个使用自身创建的线程,最后调用start()方法。
3.线程开始运行worker的run方法,其中run方法内部存在一个循环,一直从阻塞队列中获取任务,直到获取到的任务为NULL才会结束线程
4.什么时候才会获得NULL: 线程被允许过期并且线程在一定时间内未从阻塞队列获取到任务。
如果不小于,任务则会进入阻塞队列
如果入队失败(队满),则会先尝试创建临时线程处理任务。
1.何谓临时线程,其实在看完源码后发现并没有具体标识那个线程是核心线程还是临时线程,任何一个线程在getTask()获取不到时线程就会结束。只是说在获取元素时的方法是阻塞获取还是给定一定时间获取不到。
2.如果线程数量小于核心线程就会以阻塞的方式获取,反之以poll(keepAliveTime, TimeUnit.NANOSECONDS)给定时间内获取(未开启核心线程过期策略)。
3.线程全以poll(keepAliveTime, TimeUnit.NANOSECONDS)给定时间内获取,获取不到则会过期。会留有一个工作线程,除非返回NULL时判断工作队列不为空。(开启核心线程过期策略)
如果创建临时线程失败(核心线程+临时线程>最大线程数)则会触发拒绝策略。
方法public Future<?> submit(Runnable task)
三个submit方法(重载),都是将任务Runnable,Callable封装成RunnableFuture。再通过execute方法提交FutureTask到线程池中等待被执行,最终执行的是FutureTask的run方法。可以拿到一个Future对象,表示异步计算的结果。它提供了检查计算是否完成,检索计算的结果等方法。还方便Exception处理,如果task里会抛出异常,可以再调用处感知这些exception并做出及时的处理
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
本文地址:https://blog.csdn.net/wsdfym/article/details/110825953
上一篇: java基础算法题(含答案)
下一篇: JavaSE