Java并发编程:Java线程池核心ThreadPoolExecutor的使用和原理分析
目录
引出线程池
线程是并发编程的基础,前面的文章里,我们的实例基本都是基于线程开发作为实例,并且都是使用的时候就创建一个线程。这种方式比较简单,但是存在一个问题,那就是线程的数量问题。
假设有一个系统比较复杂,需要的线程数很多,如果都是采用这种方式来创建线程的话,那么就会极大的消耗系统资源。首先是因为线程本身的创建和销毁需要时间,如果每个小任务都创建一个线程,那么就会大大降低系统的效率。其次是线程本身也是占用内存空间的,大量的线程运行会抢占内存资源,处理不当很可能会内存溢出,这显然不是我们想看到的。
那么有什么办法解决呢?有一个好的思路就是对线程进行复用,因为所有的线程并不都是同一时间一起运行的,有些线程在某个时刻可能是空闲状态,如果这部分空闲线程能有效利用起来,那么就能让线程的运行被充分的利用,这样就不需要创建那么多的线程了。我们可以把特定数量的线程放在一个容器里,需要使用线程时,从容器里拿出空闲线程使用,线程工作完后不急着关闭,而是退回到线程池等待使用。这样的容器一般被称为线程池。用线程池来管理线程是非常有效的方法,用一张图片可以简单的展示出线程池的管理流程:
executor框架
java中也有一套框架来控制管理线程,那就是executor框架。executor框架是jdk1.5之后才引入的,位于java.util.cocurrent 包下,可以通过该框架来控制线程的启动、执行和关闭,从而简化并发编程的操作,这是它的核心成员类图:
executor:最上层的接口,定义了一个基本方法execute,接受一个runnable参数,用来替代通常创建或启动线程的方法。
executorservice:继承自executor接口,提供了处理多线程的方法。
scheduledexecutorservice:定时调度接口,继承自executorservice。
abstractexecutorservice:执行框架的抽象类。
threadpoolexecutor:线程池中最核心的一个类,提供了线程池操作的基本方法。
executors:线程池工厂类,可用于创建一系列有特定功能的线程池。
threadpoolexecutor详解
以上executor框架中的基本成员,其中最核心的的成员无疑就是threadpoolexecutor,想了解java中线程池的运行机制,就必须先了解这个类,而最好的了解方式无疑就是看源码。
构造函数
打开threadpoolexecutor的源码,发现类中提供了四个构造方法
public threadpoolexecutor(int corepoolsize, int maximumpoolsize, long keepalivetime, timeunit unit, blockingqueue<runnable> workqueue) { this(corepoolsize, maximumpoolsize, keepalivetime, unit, workqueue, executors.defaultthreadfactory(), defaulthandler); } public threadpoolexecutor(int corepoolsize, int maximumpoolsize, long keepalivetime, timeunit unit, blockingqueue<runnable> workqueue, threadfactory threadfactory) { this(corepoolsize, maximumpoolsize, keepalivetime, unit, workqueue, threadfactory, defaulthandler); } public threadpoolexecutor(int corepoolsize, int maximumpoolsize, long keepalivetime, timeunit unit, blockingqueue<runnable> workqueue, rejectedexecutionhandler handler) { this(corepoolsize, maximumpoolsize, keepalivetime, unit, workqueue, executors.defaultthreadfactory(), handler); } public threadpoolexecutor(int corepoolsize, int maximumpoolsize, long keepalivetime, timeunit unit, blockingqueue<runnable> workqueue, threadfactory threadfactory, rejectedexecutionhandler handler) { if (corepoolsize < 0 || maximumpoolsize <= 0 || maximumpoolsize < corepoolsize || keepalivetime < 0) throw new illegalargumentexception(); if (workqueue == null || threadfactory == null || handler == null) throw new nullpointerexception(); this.corepoolsize = corepoolsize; this.maximumpoolsize = maximumpoolsize; this.workqueue = workqueue; this.keepalivetime = unit.tonanos(keepalivetime); this.threadfactory = threadfactory; this.handler = handler; }
可以看出,threadpoolexecutor的构造函数中的参数还是比较多的,并且最核心的是第四个构造函数,其中完成了底层的初始化工作。
下面解释一下构造函数参数的含义:
- corepoolsize:线程池的基本大小。当提交一个任务到线程池后,线程池会创建一个线程执行任务,重复这种操作,直到线程池中的数目达到corepoolsize后不再创建新线程,而是把任务放到缓存队列中。
- maximumpoolsize:线程池允许创建的最大线程数。
- workqueue:阻塞队列,用于存储等待执行的任务,并且只能存储调用
execute
方法提交的任务。常用的有三种队列,synchronousqueue,linkedblockingdeque,arrayblockingqueue。 - keepalivetime:线程池中线程的最大空闲时间,这种情况一般是线程数目大于任务的数量导致。
- unit:keepalivetime的时间单位,timeunit是一个枚举类型,位于java.util.concurrent包下。
threadfactory:线程工厂,用于创建线程。
handler:拒绝策略,当任务太多来不及处理时所采用的处理策略。
重要的变量
看完了构造函数,我们来看下threadpoolexecutor类中几个重要的成员变量:
private final atomicinteger ctl = new atomicinteger(ctlof(running, 0)); private static final int count_bits = integer.size - 3; private static final int capacity = (1 << count_bits) - 1; // runstate is stored in the high-order bits private static final int running = -1 << count_bits; private static final int shutdown = 0 << count_bits; private static final int stop = 1 << count_bits; private static final int tidying = 2 << count_bits; private static final int terminated = 3 << count_bits; // packing and unpacking ctl private static int runstateof(int c) { return c & ~capacity; } private static int workercountof(int c) { return c & capacity; } private static int ctlof(int rs, int wc) { return rs | wc; }
ctl:控制线程运行状态的一个字段。同时,根据下面的几个方法runstateof
,workercountof
,ctlof
可以看出,该字段还包含了两部分的信息:线程池的运行状态 (runstate) 和线程池内有效线程的数量 (workercount),并且使用的是integar类型,高3位保存runstate,低29位保存workercount。
count_bits:值为29的常量,在字段capacity
被引用计算。
capacity:表示有效线程数量(workercount)的上限,大小为 (1<<29) - 1。
下面5个变量表示的是线程的运行状态,分别是:
- running :接受新提交的任务,并且能处理阻塞队列中的任务;
- shutdown:不接受新的任务,但会执行队列中的任务。
- stop:不接受新任务,也不处理队列中的任务,同时中断正在处理任务的线程。
- tidying:如果所有的任务都已终止了,workercount (有效线程数) 为0,线程池进入该状态后会调用 terminated() 方法进入terminated 状态。
- terminated:terminated( ) 方法执行完毕。
用一个状态转换图表示大概如下 (图片来源于):
构造函数和基本参数都了解后,接下来就是对类中重要方法的研究了。
线程池执行流程
execute方法
threadpoolexecutor类的核心调度方法是execute(),通过调用这个方法可以向线程池提交一个任务,交由线程池去执行。而threadpoolexecutor的工作逻辑也可以藉由这个方法来一步步理清。这是方法的源码:
public void execute(runnable command) { if (command == null) throw new nullpointerexception(); //获取ctl的值,前面说了,该值记录着runstate和workercount int c = ctl.get(); /* * 调用workercountof得到当前活动的线程数; * 当前活动线程数小于corepoolsize,新建一个线程放入线程池中; * addworker(): 把任务添加到该线程中。 */ if (workercountof(c) < corepoolsize) { if (addworker(command, true)) return; //如果上面的添加线程操作失败,重新获取ctl值 c = ctl.get(); } //如果当前线程池是运行状态,并且往工作队列中添加该任务 if (isrunning(c) && workqueue.offer(command)) { int recheck = ctl.get(); /* * 如果当前线程不是运行状态,把任务从队列中移除 * 调用reject(内部调用handler)拒绝接受任务 */ if (! isrunning(recheck) && remove(command)) reject(command); //获取线程池中的有效线程数,如果为0,则执行addworker创建一个新线程 else if (workercountof(recheck) == 0) addworker(null, false); } /* * 如果执行到这里,有两种情况: * 1. 线程池已经不是running状态; * 2. 线程池是running状态,但workercount >= corepoolsize并且workqueue已满。 * 这时,再次调用addworker方法,但第二个参数传入为false,将线程池的有限线程数量的上限设置为maximumpoolsize; * 如果失败则拒绝该任务 */ else if (!addworker(command, false)) reject(command); }
简单概括一下代码的逻辑,大概是这样:
1、判断当前运行中的线程数是否小于corepoolsize,是的话则调用addworker创建线程执行任务。
2、不满足1的条件,就把任务放入工作队列workqueue中。
3、如果任务成功加入workqueue,判断线程池是否是运行状态,不是的话先把任务移出工作队列,并调用reject方法,使用拒绝策略拒绝该任务。线程如果是非运行中,调用addworker创建一个新线程。
4、如果放入workqueue失败 (队列已满),则调用addworker创建线程执行任务,如果这时创建线程失败 (addworker传进去的第二个参数值是false,说明这种情况是当前线程数不小于maximumpoolsize),就会调用reject(内部调用handler)拒绝接受任务。
整个执行流程用一张图片表示大致如下:
以上就是execute方法的大概逻辑,接下来看看addworker的方法实现。
addworker方法
源码如下:
private boolean addworker(runnable firsttask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runstateof(c); /**线程池状态不为shutdown时 * 判断队列或者任务是否为空,是的话返回false */. if (rs >= shutdown && ! (rs == shutdown && firsttask == null && ! workqueue.isempty())) return false; for (;;) { int wc = workercountof(c); /* 这里可以看出core参数决定着活动线程数的大小比较对象 * core为true表示与 corepoolsize大小进行比较 * core为false表示与 maximumpoolsize大小进行比较 * 当前活动线程数大于比较对象就返回false */ if (wc >= capacity || wc >= (core ? corepoolsize : maximumpoolsize)) return false; // 尝试增加workercount,如果成功,则跳出第一个for循环 if (compareandincrementworkercount(c)) break retry; // 如果增加workercount失败,则重新获取ctl的值 c = ctl.get(); // re-read ctl // 如果当前的运行状态不等于rs,说明状态已被改变,返回第一个for循环继续执行 if (runstateof(c) != rs) continue retry; // else cas failed due to workercount change; retry inner loop } } boolean workerstarted = false; boolean workeradded = false; worker w = null; try { //创建一个worker对象w w = new worker(firsttask); //实例化w的线程t final thread t = w.thread; if (t != null) { final reentrantlock mainlock = this.mainlock; mainlock.lock(); try { // recheck while holding lock. // back out on threadfactory failure or if // shut down before lock acquired. int rs = runstateof(ctl.get()); if (rs < shutdown || (rs == shutdown && firsttask == null)) { if (t.isalive()) // precheck that t is startable throw new illegalthreadstateexception(); // workers是一个hashset,保存着任务的worker对象 workers.add(w); int s = workers.size(); if (s > largestpoolsize) largestpoolsize = s; workeradded = true; } } finally { mainlock.unlock(); } if (workeradded) { //启动线程 t.start(); workerstarted = true; } } } finally { if (! workerstarted) addworkerfailed(w); } return workerstarted; }
从代码中可以看出,addworker方法的主要工作是在线程池中创建一个新的线程并执行,其中firsttask参数指定的是新线程需要执行的第一个任务,core参数决定于活动线程数的比较对象是corepoolsize还是maximumpoolsize。根据传进来的参数首先对线程池和队列的状态进行判断,满足条件就新建一个worker对象,并实例化该对象的线程,最后启动线程。
worker类
根据addworker源码中的逻辑,我们可以发现,线程池中的每一个线程其实都是对应的worker对象在维护的,所以我们有必要对worker类一探究竟,先看一下类的源码:
private final class worker extends abstractqueuedsynchronizer implements runnable { /** * this class will never be serialized, but we provide a * serialversionuid to suppress a javac warning. */ private static final long serialversionuid = 6138294804551838833l; /** thread this worker is running in. null if factory fails. */ final thread thread; /** initial task to run. possibly null. */ runnable firsttask; /** per-thread task counter */ volatile long completedtasks; /** * creates with given first task and thread from threadfactory. * @param firsttask the first task (null if none) */ worker(runnable firsttask) { setstate(-1); // inhibit interrupts until runworker this.firsttask = firsttask; this.thread = getthreadfactory().newthread(this); } /** delegates main run loop to outer runworker */ public void run() { runworker(this); } // lock methods // // the value 0 represents the unlocked state. // the value 1 represents the locked state. protected boolean isheldexclusively() { return getstate() != 0; } protected boolean tryacquire(int unused) { if (compareandsetstate(0, 1)) { setexclusiveownerthread(thread.currentthread()); return true; } return false; } protected boolean tryrelease(int unused) { setexclusiveownerthread(null); setstate(0); return true; } public void lock() { acquire(1); } public boolean trylock() { return tryacquire(1); } public void unlock() { release(1); } public boolean islocked() { return isheldexclusively(); } void interruptifstarted() { thread t; if (getstate() >= 0 && (t = thread) != null && !t.isinterrupted()) { try { t.interrupt(); } catch (securityexception ignore) { } } } }
从worker类的构造函数可以看出,当实例化一个worker对象时,worker对象会把传进来的runnable参数firsttask
赋值给自己的同名属性,并且用线程工厂也就是当前的threadfactory来新建一个线程。
同时,因为worker实现了runnable接口,所以当worker类中的线程启动时,调用的其实是run()方法。run方法中调用的是runworker
方法,我们来看下它的具体实现:
final void runworker(worker w) { thread wt = thread.currentthread(); //获取第一个任务 runnable task = w.firsttask; w.firsttask = null; //允许中断 w.unlock(); // allow interrupts //是否因为异常退出循环的标志,processworkerexit方法会对该参数做判断 boolean completedabruptly = true; try { //判断task是否为null,是的话通过gettask()从队列中获取任务 while (task != null || (task = gettask()) != null) { w.lock(); // if pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. this // requires a recheck in second case to deal with // shutdownnow race while clearing interrupt /* 这里的判断主要逻辑是这样: * 如果线程池正在停止,那么就确保当前线程是中断状态; * 如果不是的话,就要保证不是中断状态 */ if ((runstateatleast(ctl.get(), stop) || (thread.interrupted() && runstateatleast(ctl.get(), stop))) && !wt.isinterrupted()) wt.interrupt(); try { //用于记录任务执行前需要做哪些事,属于threadpoolexecutor类中的方法, //是空的,需要子类具体实现 beforeexecute(wt, task); throwable thrown = null; try { //执行任务 task.run(); } catch (runtimeexception x) { thrown = x; throw x; } catch (error x) { thrown = x; throw x; } catch (throwable x) { thrown = x; throw new error(x); } finally { afterexecute(task, thrown); } } finally { task = null; w.completedtasks++; w.unlock(); } } completedabruptly = false; } finally { processworkerexit(w, completedabruptly); } }
总结一下runworker方法的运行逻辑:
1、通过while循环不断地通过gettask()方法从队列中获取任务;
2、如果线程池正在停止状态,确保当前的线程是中断状态,否则确保当前线程不中断;
3、调用task的run()方法执行任务,执行完毕后需要置为null;
4、循环调用gettask()取不到任务了,跳出循环,执行processworkerexit()方法。
过完runworker()的运行流程,我们来看下gettask()是怎么实现的。
gettask方法
gettask()方法的作用是从队列中获取任务,下面是该方法的源码:
private runnable gettask() { //记录上次从队列获取任务是否超时 boolean timedout = false; // did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runstateof(c); // check if queue empty only if necessary. if (rs >= shutdown && (rs >= stop || workqueue.isempty())) { //将workercount减1 decrementworkercount(); return null; } int wc = workercountof(c); // are workers subject to culling? /* timed变量用于判断线程的操作是否需要进行超时判断 * allowcorethreadtimeout不管它,默认是false * wc > corepoolsize,当前线程是如果大于核心线程数corepoolsize */ boolean timed = allowcorethreadtimeout || wc > corepoolsize; if ((wc > maximumpoolsize || (timed && timedout)) && (wc > 1 || workqueue.isempty())) { if (compareanddecrementworkercount(c)) return null; continue; } try { /* 根据timed变量判断,如果为true,调用workqueue的poll方法获取任务, * 如果在keepalivetime时间内没有获取到任务,则返回null; * timed为false的话,就调用workqueue的take方法阻塞队列, * 直到队列中有任务可取。 */ runnable r = timed ? workqueue.poll(keepalivetime, timeunit.nanoseconds) : workqueue.take(); if (r != null) return r; //r为null,说明time为true,超时了,把timedout也设置为true timedout = true; } catch (interruptedexception retry) { //发生异常,把timedout也设置为false,重新跑循环 timedout = false; } } }
gettask的代码看上去比较简单,但其实内有乾坤,我们来重点分析一下两个if判断的逻辑:
1、当进入gettask方法后,先判断当前线程池状态,如果线程池状态rs >= shutdown,再进行以下判断:
1)rs 的状态是否大于stop;2)队列是否为空;
满足以上条件其中之一,就将workercount减1并返回null,也就是表示队列中不再有任务。因为线程池的状态值是shutdown以上时,队列中不再允许添加新任务,所以上面两个条件满足一个都说明队列中的任务都取完了。
2、进入第二个if判断,这里的逻辑有点绕,但作用比较重要,是为了控制线程池的有效线程数量,我们来具体解析下代码:
wc > maximumpoolsize
:判断当前线程数是否大于maximumpoolsize,这种情况一般很少发生,除非是maximumpoolsize的大小在该程序执行的同时被进行设置,比如调用threadpoolexecutor中的setmaximumpoolsize
方法。
timed && timedout
:如果为true,表示当前的操作需要进行超时判断,并且上次从队列获取任务已经超时。
wc > 1 || workqueue.isempty()
:如果工作线程大于1,或者阻塞队列是空的。
compareanddecrementworkercount
:比较并将线程池中的workercount减1
在上文中,我们解析execute方法的逻辑时了解到,如果当前线程池的线程数量超过了corepoolsize且小于maximumpoolsize,并且workqueue已满时,仍然可以增加工作线程。
但调用gettask()取任务的过程中,如果超时没有获取到任务,也就是timedout为true的情况,说明workqueue已经为空了,也就说明了当前线程池中不需要那么多线程来执行任务了,可以把多于corepoolsize数量的线程销毁掉,也就是不断的让任务被取出,让线程数量保持在corepoolsize即可,直到gettask方法返回null。
而当gettask方法返回null后,runworker方法中就会因为取不到任务而执行processworkerexit()方法。
processworkerexit方法
processworkerexit方法的作用主要是对worker对象的移除,下面是方法的源码:
private void processworkerexit(worker w, boolean completedabruptly) { //是异常退出的话,执行程序将workercount数量减1 if (completedabruptly) // if abrupt, then workercount wasn't adjusted decrementworkercount(); final reentrantlock mainlock = this.mainlock; mainlock.lock(); try { completedtaskcount += w.completedtasks; // 从workers的集合中移除worker对象,也就表示着从线程池中移除了一个工作线程 workers.remove(w); } finally { mainlock.unlock(); } tryterminate(); int c = ctl.get(); if (runstatelessthan(c, stop)) { if (!completedabruptly) { int min = allowcorethreadtimeout ? 0 : corepoolsize; if (min == 0 && ! workqueue.isempty()) min = 1; if (workercountof(c) >= min) return; // replacement not needed } addworker(null, false); } }
至此,从executor方法开始的整个运行过程就完毕了,总结一下该流程:
执行executor --> 新建worker对象,并实例化线程 --> 调用runworker方法,通过gettask()获取任务,并执行run方法 --> gettask()方法中不断向队列取任务,并将workercount数量减1,直至返回null --> 调用processworkerexit清除worker对象。
用一张流程图表示如下所示 (图片来源于):
任务队列workqueue
前面我们多次提到了workqueue,这是一个任务队列,用来存放等待执行的任务,它是blockingqueue
synchronousqueue:直接提交的队列。这个队列没有容量,当接收到任务的时候,会直接提交给线程处理,而不保留它。如果没有空闲的线程,就新建一个线程来处理这个任务!如果线程数量达到最大值,就会执行拒绝策略。所以,使用这个类型队列的时候,一般都是将maximumpoolsize一般指定成integer.max_value,避免容易被拒绝。
arrayblockingqueue:有界的任务队列。需要给定一个参数来限制队列的长度,接收到任务的时候,如果没有达到corepoolsize的值,则新建线程 (核心线程) 执行任务,如果达到了,则将任务放入等待队列。如果队列已满,则在总线程数不到maximumpoolsize的前提下新建线程执行任务,若大于maximumpoolsize,则执行拒绝策略。
linkedblockingqueue:*的任务队列。该队列没有任务数量的限制,所以任务可以一直入队,知道耗尽系统资源。当接收任务,如果当前线程数小于corepoolsize,则新建线程处理任务;如果当前线程数等于corepoolsize,则进入队列等待。
任务拒绝策略
当线程池的任务队列已满并且线程数目达到maximumpoolsize时,对于新加的任务一般会采取拒绝策略,通常有以下四种策略:
- abortpolicy:直接抛出异常,这是默认策略;
- callerrunspolicy:用调用者所在的线程来执行任务;
- discardoldestpolicy:丢弃阻塞队列中靠最前的任务,并执行当前任务;
- discardpolicy:直接丢弃任务;
线程池的关闭
threadpoolexecutor提供了两个方法,用于线程池的关闭,分别是shutdown()和shutdownnow():
public void shutdown() { final reentrantlock mainlock = this.mainlock; mainlock.lock(); try { checkshutdownaccess(); advancerunstate(shutdown); interruptidleworkers(); onshutdown(); // hook for scheduledthreadpoolexecutor } finally { mainlock.unlock(); } tryterminate(); } public list<runnable> shutdownnow() { list<runnable> tasks; final reentrantlock mainlock = this.mainlock; mainlock.lock(); try { checkshutdownaccess(); advancerunstate(stop); interruptworkers(); tasks = drainqueue(); } finally { mainlock.unlock(); } tryterminate(); return tasks; }
代码逻辑就不一一进行解析了,总结一下两个方法的特点就是:
- shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务
- shutdownnow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务
threadpoolexecutor创建线程池实例
threadpoolexecutor的运行机制讲完了,接下来展示一下如何用threadpoolexecutor创建线程池实例,具体代码如下:
public static void main(string[] args) { executorservice service = new threadpoolexecutor(5, 10, 300, timeunit.milliseconds, new arrayblockingqueue<runnable>(5)); //用lambda表达式编写方法体中的逻辑 runnable run = () -> { try { thread.sleep(1000); system.out.println(thread.currentthread().getname() + "正在执行"); } catch (interruptedexception e) { e.printstacktrace(); } }; for (int i = 0; i < 10; i++) { service.execute(run); } //这里一定要做关闭 service.shutdown(); }
上面的代码中,我们创建的threadpoolexecutor线程池的核心线程数为5个,所以,当调用线程池执行任务时,同时运行的线程最多也是5个,执行main方法,输出结果如下:
pool-1-thread-3正在执行 pool-1-thread-1正在执行 pool-1-thread-4正在执行 pool-1-thread-5正在执行 pool-1-thread-3正在执行 pool-1-thread-2正在执行 pool-1-thread-1正在执行 pool-1-thread-4正在执行 pool-1-thread-5正在执行
看到出来,线程池确实只有5个线程在工作,也就是真正的实现了线程的复用,说明我们的threadpoolexecutor实例是有效的。
参考:
《实战java:高并发程序设计》