欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

了解Java线程池执行原理

程序员文章站 2023-10-22 23:26:28
前言 上一篇已经对进行了分析,了解线程池既有预设的模板,也提供多种参数支撑灵活的定制。 本文将会围绕线程池的生命周期,分析线程池执行任务的过程。 线程池状态 首...

前言

上一篇已经对进行了分析,了解线程池既有预设的模板,也提供多种参数支撑灵活的定制。

本文将会围绕线程池的生命周期,分析线程池执行任务的过程。

线程池状态

首先认识两个贯穿线程池代码的参数:

  • runstate:线程池运行状态
  • workercount:工作线程的数量

线程池用一个32位的int来同时保存runstate和workercount,其中高3位是runstate,其余29位是workercount。代码中会反复使用runstateof和workercountof来获取runstate和workercount。

private final atomicinteger ctl = new atomicinteger(ctlof(running, 0));
private static final int count_bits = integer.size - 3;
private static final int capacity = (1 << count_bits) - 1;
// 线程池状态
private static final int running = -1 << count_bits;
private static final int shutdown = 0 << count_bits;
private static final int stop = 1 << count_bits;
private static final int tidying = 2 << count_bits;
private static final int terminated = 3 << count_bits;
// ctl操作
private static int runstateof(int c) { return c & ~capacity; }
private static int workercountof(int c) { return c & capacity; }
private static int ctlof(int rs, int wc) { return rs | wc; }


  • running:可接收新任务,可执行等待队列里的任务
  • shutdown:不可接收新任务,可执行等待队列里的任务
  • stop:不可接收新任务,不可执行等待队列里的任务,并且尝试终止所有在运行任务
  • tidying:所有任务已经终止,执行terminated()
  • terminated:terminated()执行完成

线程池状态默认从running开始流转,到状态terminated结束,中间不需要经过每一种状态,但不能让状态回退。下面是状态变化可能的路径和变化条件:

了解Java线程池执行原理

worker的创建

线程池是由worker类负责执行任务,worker继承了abstractqueuedsynchronizer,引出了java并发框架的核心aqs。

abstractqueuedsynchronizer,简称aqs,是java并发包里一系列同步工具的基础实现,原理是根据状态位来控制线程的入队阻塞、出队唤醒来处理同步。

aqs不会在这里展开讨论,只需要知道worker包装了thread,由它去执行任务。

调用execute将会根据线程池的情况创建worker,可以归纳出下图四种情况:

了解Java线程池执行原理

public void execute(runnable command) {
if (command == null)
throw new nullpointerexception();
int c = ctl.get();
//1
if (workercountof(c) < corepoolsize) {
if (addworker(command, true))
return;
c = ctl.get();
}
//2
if (isrunning(c) && workqueue.offer(command)) {
int recheck = ctl.get();
if (! isrunning(recheck) && remove(command))
//3
reject(command);
else if (workercountof(recheck) == 0)
//4
addworker(null, false);
}
//5
else if (!addworker(command, false))
//6
reject(command);
}

标记1对应第一种情况,要留意addworker传入了core,core=true为corepoolsize,core=false为maximumpoolsize,

新增时需要检查workercount是否超过允许的最大值。

标记2对应第二种情况,检查线程池是否在运行,并且将任务加入等待队列。标记3再检查一次线程池状态,如果线程池忽然处于非运行状态,那就将等待队列刚加的任务删掉,再交给rejectedexecutionhandler处理。标记4发现没有worker,就先补充一个空任务的worker。

标记5对应第三种情况,等待队列不能再添加任务了,调用addworker添加一个去处理。

标记6对应第四种情况,addworker的core传入false,返回调用失败,代表workercount已经超出maximumpoolsize,那就交给rejectedexecutionhandler处理。

private boolean addworker(runnable firsttask, boolean core) {
//1
retry:
for (;;) {
int c = ctl.get();
int rs = runstateof(c);
// check if queue empty only if necessary.
if (rs >= shutdown &&
! (rs == shutdown &&
firsttask == null &&
! workqueue.isempty()))
return false;

for (;;) {
int wc = workercountof(c);
if (wc >= capacity ||
wc >= (core ? corepoolsize : maximumpoolsize))
return false;
if (compareandincrementworkercount(c))
break retry;
c = ctl.get(); // re-read ctl
if (runstateof(c) != rs)
continue retry;
// else cas failed due to workercount change; retry inner loop
}
}
//2
boolean workerstarted = false;
boolean workeradded = false;
worker w = null;
try {
w = new worker(firsttask);
final thread t = w.thread;
if (t != null) {
final reentrantlock mainlock = this.mainlock;
mainlock.lock();
try {
// recheck while holding lock.
// back out on threadfactory failure or if
// shut down before lock acquired.
int rs = runstateof(ctl.get());
if (rs < shutdown ||
(rs == shutdown && firsttask == null)) {
if (t.isalive()) // precheck that t is startable
throw new illegalthreadstateexception();
workers.add(w);
int s = workers.size();
if (s > largestpoolsize)
largestpoolsize = s;
workeradded = true;
}
} finally {
mainlock.unlock();
}
if (workeradded) {
t.start();
workerstarted = true;
}
}
} finally {
if (! workerstarted)
addworkerfailed(w);
}
return workerstarted;
}

标记1的第一段代码,目的很简单,是为workercount加一。至于为什么代码写了这么长,是因为线程池的状态在不断

变化,并发环境下需要保证变量的同步性。外循环判断线程池状态、任务非空和队列非空,内循环使用cas机制保证workercount正确地递增。不了解cas可以看认识非阻塞的同步机制cas,后续增减workercount都会使用cas。

标记2的第二段代码,就比较简单。创建一个新worker对象,将worker添加进workers里(set集合)。成功添加后,启动worker里的线程。在finally里判断线程是否启动成功,不成功直接调用addworkerfailed。

private void addworkerfailed(worker w) {
final reentrantlock mainlock = this.mainlock;
mainlock.lock();
try {
if (w != null)
workers.remove(w);
decrementworkercount();
tryterminate();
} finally {
mainlock.unlock();
}
}

addworkerfailed将减少已经递增的workercount,并且调用tryterminate结束线程池。

worker的执行

worker(runnable firsttask) {
setstate(-1); // inhibit interrupts until runworker
this.firsttask = firsttask;
this.thread = getthreadfactory().newthread(this);
}
public void run() {
runworker(this);
}


worker在构造函数里采用threadfactory创建thread,在run方法里调用了runworker,看来是真正执行任务的地方。

final void runworker(worker w) {
thread wt = thread.currentthread();
runnable task = w.firsttask;
w.firsttask = null;
w.unlock(); // allow interrupts
boolean completedabruptly = true;
try {
//1
while (task != null || (task = gettask()) != null) {
w.lock();
//2
if ((runstateatleast(ctl.get(), stop) ||
(thread.interrupted() &&
runstateatleast(ctl.get(), stop))) &&
!wt.isinterrupted())
wt.interrupt();
try {
//3
beforeexecute(wt, task);
throwable thrown = null;
try {
task.run();
} catch (runtimeexception x) {
thrown = x; throw x;
} catch (error x) {
thrown = x; throw x;
} catch (throwable x) {
thrown = x; throw new error(x);
} finally {
afterexecute(task, thrown);
}
} finally {
task = null;
//4
w.completedtasks++;
w.unlock();
}
}
completedabruptly = false; //5
} finally {
//6
processworkerexit(w, completedabruptly);
}
}

标记1进入循环,从gettask获取要执行的任务,直到返回null。这里达到了线程复用的效果,让线程处理多个任务。

标记2是一个比较复杂的判断,保证了线程池在stop状态下线程是中断的,非stop状态下线程没有被中断。如果你不了解java的中断机制,看如何正确结束java线程这篇

标记3调用了run方法,真正执行了任务。执行前后提供了beforeexecute和afterexecute两个方法,由子类实现。

标记4里的completedtasks统计worker执行了多少任务,最后累加进completedtaskcount变量,可以调用相应方法返回一些统计信息。

标记5的变量completedabruptly表示worker是否异常终止,执行到这里代表执行正常,后续的方法需要这个变量。

标记6调用processworkerexit结束,后面会分析。

接着来看worker从等待队列获取任务的gettask方法:

private runnable gettask() {
boolean timedout = false; // did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runstateof(c);
//1
// check if queue empty only if necessary.
if (rs >= shutdown && (rs >= stop || workqueue.isempty())) {
decrementworkercount();
return null;
}
int wc = workercountof(c);
//2
// are workers subject to culling?
boolean timed = allowcorethreadtimeout || wc > corepoolsize;
if ((wc > maximumpoolsize || (timed && timedout))
&& (wc > 1 || workqueue.isempty())) {
if (compareanddecrementworkercount(c))
return null;
continue;
}
//3
try {
runnable r = timed ?
workqueue.poll(keepalivetime, timeunit.nanoseconds) :
workqueue.take();
if (r != null)
return r;
timedout = true;
} catch (interruptedexception retry) {
timedout = false;
}
}
}

标记1检查线程池的状态,这里就体现出shutdown和stop的区别。如果线程池是shutdown状态,还会先处理完等待队列的任务;如果是stop状态,就不再处理等待队列里的任务了。

标记2先看allowcorethreadtimeout这个变量,false时worker空闲,也不会结束;true时,如果worker空闲超过keepalivetime,就会结束。接着是一个很复杂的判断,好难转成文字描述,自己看吧。注意一下wc>maximumpoolsize,出现这种可能是在运行中调用setmaximumpoolsize,还有wc>1,在等待队列非空时,至少保留一个worker。

标记3是从等待队列取任务的逻辑,根据timed分为等待keepalivetime或者阻塞直到有任务。

最后来看结束worker需要执行的操作:

private void processworkerexit(worker w, boolean completedabruptly) {
//1
if (completedabruptly) // if abrupt, then workercount wasn't adjusted
decrementworkercount();
//2
final reentrantlock mainlock = this.mainlock;
mainlock.lock();
try {
completedtaskcount += w.completedtasks;
workers.remove(w);
} finally {
mainlock.unlock();
}
//3
tryterminate();
int c = ctl.get();
//4
if (runstatelessthan(c, stop)) {
if (!completedabruptly) {
int min = allowcorethreadtimeout ? 0 : corepoolsize;
if (min == 0 && ! workqueue.isempty())
min = 1;
if (workercountof(c) >= min)
return; // replacement not needed
}
addworker(null, false);
}
}

正常情况下,在gettask里就会将workercount减一。标记1处用变量completedabruptly判断worker是否异常退出,如果是,需要补充对workercount的减一。

标记2将worker处理任务的数量累加到总数,并且在集合workers中去除。

标记3尝试终止线程池,后续会研究。

标记4处理线程池还是running或shutdown状态时,如果worker是异常结束,那么会直接addworker。如果allowcorethreadtimeout=true,并且等待队列有任务,至少保留一个worker;如果allowcorethreadtimeout=false,workercount不少于corepoolsize。

总结一下worker:线程池启动后,worker在池内创建,包装了提交的runnable任务并执行,执行完就等待下一个任务,不再需要时就结束。

线程池的关闭

线程池的关闭不是一关了事,worker在池里处于不同状态,必须安排好worker的”后事”,才能真正释放线程池。threadpoolexecutor提供两种方法关闭线程池:

  • shutdown:不能再提交任务,已经提交的任务可继续运行;
  • shutdownnow:不能再提交任务,已经提交但未执行的任务不能运行,在运行的任务可继续运行,但会被中断,返回已经提交但未执行的任务。
public void shutdown() {
final reentrantlock mainlock = this.mainlock;
mainlock.lock();
try {
checkshutdownaccess(); //1 安全策略机制
advancerunstate(shutdown); //2
interruptidleworkers(); //3
onshutdown(); //4 空方法,子类实现
} finally {
mainlock.unlock();
}
tryterminate(); //5
}

shutdown将线程池切换到shutdown状态,并调用interruptidleworkers请求中断所有空闲的worker,最后调用tryterminate尝试结束线程池。

public list<runnable> shutdownnow() {
list<runnable> tasks;
final reentrantlock mainlock = this.mainlock;
mainlock.lock();
try {
checkshutdownaccess();
advancerunstate(stop);
interruptworkers();
tasks = drainqueue(); //1
} finally {
mainlock.unlock();
}
tryterminate();
return tasks;
}

shutdownnow和shutdown类似,将线程池切换为stop状态,中断目标是所有worker。drainqueue会将等待队列里未执行的任务返回。

interruptidleworkers和interruptworkers实现原理都是遍历workers集合,中断条件符合的worker。

上面的代码多次出现调用tryterminate,这是一个尝试将线程池切换到terminated状态的方法。

final void tryterminate() {
for (;;) {
int c = ctl.get();
//1
if (isrunning(c) ||
runstateatleast(c, tidying) ||
(runstateof(c) == shutdown && ! workqueue.isempty()))
return;
//2
if (workercountof(c) != 0) { // eligible to terminate
interruptidleworkers(only_one);
return;
}
//3
final reentrantlock mainlock = this.mainlock;
mainlock.lock();
try {
if (ctl.compareandset(c, ctlof(tidying, 0))) {
try {
terminated();
} finally {
ctl.set(ctlof(terminated, 0));
termination.signalall();
}
return;
}
} finally {
mainlock.unlock();
}
// else retry on failed cas
}
}

标记1检查线程池状态,下面几种情况,后续操作都没有必要,直接return。

  • running(还在运行,不能停)
  • tidying或terminated(已经没有在运行的worker)
  • shutdown并且等待队列非空(执行完才能停)

标记2在worker非空的情况下又调用了interruptidleworkers,你可能疑惑在shutdown时已经调用过了,为什么又调用,而且每次只中断一个空闲worker?

你需要知道,shutdown时worker可能在执行中,执行完阻塞在队列的take,不知道要结束,所有要补充调用interruptidleworkers。每次只中断一个是因为processworkerexit时,还会执行tryterminate,自动中断下一个空闲的worker。

标记3是最终的状态切换。线程池会先进入tidying状态,再进入terminated状态,中间提供了terminated这个空方法供子类实现。

调用关闭线程池方法后,需要等待线程池切换到terminated状态。awaittermination检查限定时间内线程池是否进入terminated状态,代码如下:

public boolean awaittermination(long timeout, timeunit unit)
throws interruptedexception {
long nanos = unit.tonanos(timeout);
final reentrantlock mainlock = this.mainlock;
mainlock.lock();
try {
for (;;) {
if (runstateatleast(ctl.get(), terminated))
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitnanos(nanos);
}
} finally {
mainlock.unlock();
}
}

后言

以上过了一遍线程池主要的逻辑,总体来看线程池的设计是很清晰的。如有错误或不足,欢迎指出,也欢迎留言交流。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。