Java并发包中线程池ThreadPoolExecutor原理探究
一、线程池简介
线程池的使用主要是解决两个问题:①当执行大量异步任务的时候线程池能够提供更好的性能,在不使用线程池时候,每当需要执行异步任务的时候直接new一个线程来运行的话,线程的创建和销毁都是需要开销的。而线程池中的线程是可复用的,不需要每次执行异步任务的时候重新创建和销毁线程;②线程池提供一种资源限制和管理的手段,比如可以限制线程的个数,动态的新增线程等等。
在下面的分析中,我们可以看到,线程池使用一个integer的原子类型变量来记录线程池状态和线程池中的线程数量,通过线程池状态来控制任务的执行,每个工作线程worker线程可以处理多个任务。
二、threadpoolexecutor类
1、我们先简单看一下关于threadpoolexecutor的一些成员变量以及其所表示的含义
threadpoolexecutor继承了abstractexecutorservice,其中的成员变量ctl是一个integer类型的原子变量,用来记录线程池的状态和线程池中的线程的个数,类似于前面讲到的中使用一个变量保存两种信息。这里(integer看做32位)ctl高三位表示线程池的状态,后面的29位表示线程池中的线程个数。如下所示是threadpoolexecutor源码中的成员变量
1 //(高3位)表示线程池状态,(低29位)表示线程池中线程的个数; 2 // 默认状态是running,线程池中线程个数为0 3 private final atomicinteger ctl = new atomicinteger(ctlof(running, 0)); 4 5 //表示具体平台下integer的二进制位数-3后的剩余位数表示的数才是线程的个数; 6 //其中integer.size=32,-3之后的低29位表示的就是线程的个数了 7 private static final int count_bits = integer.size - 3; 8 9 //线程最大个数(低29位)00011111111111111111111111111111(1<<29-1) 10 private static final int capacity = (1 << count_bits) - 1; 11 12 //线程池状态(高3位表示线程池状态) 13 //111 00000000000000000000000000000 14 private static final int running = -1 << count_bits; 15 16 //000 00000000000000000000000000000 17 private static final int shutdown = 0 << count_bits; 18 19 //001 00000000000000000000000000000 20 private static final int stop = 1 << count_bits; 21 22 //010 00000000000000000000000000000 23 private static final int tidying = 2 << count_bits; 24 25 //011 00000000000000000000000000000 26 private static final int terminated = 3 << count_bits; 27 28 //获取高3位(运行状态)==> c & 11100000000000000000000000000000 29 private static int runstateof(int c) { return c & ~capacity; } 30 31 //获取低29位(线程个数)==> c & 00011111111111111111111111111111 32 private static int workercountof(int c) { return c & capacity; } 33 34 //计算原子变量ctl新值(运行状态和线程个数) 35 private static int ctlof(int rs, int wc) { return rs | wc; }
下面我们简单解释一下上面的线程状态的含义:
①running:接受新任务并处理阻塞队列中的任务
②shutdown:拒绝新任务但是处理阻塞队列中的任务
③stop:拒绝新任务并抛弃阻塞队列中的任务,同时会中断当前正在执行的任务
④tidying:所有任务执行完之后(包含阻塞队列中的任务)当前线程池中活跃的线程数量为0,将要调用terminated方法
⑥terminated:终止状态。terminated方法调用之后的状态
2、下面初步了解一下threadpoolexecutor的参数以及实现原理
①corepoolsize:线程池核心现车个数
②workqueue:用于保存等待任务执行的任务的阻塞队列(比如基于数组的有界阻塞队列arrayblockingqueue、基于链表的*阻塞队列linkedblockingqueue等等)
③maximumpoolsize:线程池最大线程数量
④threadfactory:创建线程的工厂
⑤rejectedexecutionhandler:拒绝策略,表示当队列已满并且线程数量达到线程池最大线程数量的时候对新提交的任务所采取的策略,主要有四种策略:abortpolicy(抛出异常)、callerrunspolicy(只用调用者所在线程来运行该任务)、discardoldestpolicy(丢掉阻塞队列中最近的一个任务来处理当前提交的任务)、discardpolicy(不做处理,直接丢弃掉)
⑥keepalivetime:存活时间,如果当前线程池中的数量比核心线程数量多,并且当前线程是闲置状态,该变量就是这些线程的最大生存时间
⑦timeunit:存活时间的时间单位。
根据上面的参数介绍,简单了解一下线程池的实现原理,以提交一个新任务为开始点,分析线程池的主要处理流程
3、关于一些线程池的使用类型
①newfixedthreadpool:创建一个核心线程个数和最大线程个数均为nthreads的线程池,并且阻塞队列长度为integer.max_value,keepalivetime=0说明只要线程个数比核心线程个数多并且当前空闲即回收。
public static executorservice newfixedthreadpool(int nthreads) { return new threadpoolexecutor(nthreads, nthreads, 0l, timeunit.milliseconds, new linkedblockingqueue<runnable>()); }
②newsinglethreadexecutor:创建一个核心线程个数和最大线程个数都为1 的线程池,并且阻塞队列长度为integer.max_value,keepalivetime=0说明只要线程个数比核心线程个数多并且当前线程空闲即回收该线程。
public static executorservice newsinglethreadexecutor() { return new finalizabledelegatedexecutorservice (new threadpoolexecutor(1, 1, 0l, timeunit.milliseconds, new linkedblockingqueue<runnable>())); }
③newcachedthreadpoolexecutor:创建一个按需创建线程的线程池,初始线程个数为0,最多线程个数为integer.max_value,并且阻塞队列为同步队列(最多只有一个元素),keepalivetime=60说明只要当前线程在60s内空闲则回收。这个类型的线程池的特点就是:加入同步队列的任务会被马上执行,同步队列中最多只有一个任务
public static executorservice newcachedthreadpool() { return new threadpoolexecutor(0, integer.max_value, 60l, timeunit.seconds, new synchronousqueue<runnable>()); }
4、threadpoolexecutor中的其他成员
其中的reentrantlock可参考前面写到的java中的锁——lock和synchronized,其中降到了reentrantlock的具体实现原理;
关于aqs部分可参考前面说到的java中的队列同步器aqs,也讲到了关于aqs的具体实现原理分析;
关于条件队列的相关知识可参考前面写的java中的线程协作之condition,里面说到了关于java中线程协作condition的实现原理;
//独占锁,用来控制新增工作线程worker操作的原子性 private final reentrantlock mainlock = new reentrantlock(); //工作线程集合,worker继承了aqs接口和runnable接口,是具体处理任务的线程对象 //worker实现aqs,并自己实现了简单不可重入独占锁,其中state=0表示当前锁未被获取状态,state=1表示锁被获取, //state=-1表示work创建时候的默认状态,创建时候设置state=-1是为了防止runworker方法运行前被中断 private final hashset<worker> workers = new hashset<worker>(); //termination是该锁对应的条件队列,在线程调用awaittermination时候用来存放阻塞的线程 private final condition termination = mainlock.newcondition();
三、execute(runnable command)方法实现
executor方法的作用是提交任务command到线程池执行,可以简单的按照下面的图进行理解,threadpoolexecutor的实现类似于一个生产者消费者模型,当用户添加任务到线程池中相当于生产者生产元素,workers工作线程则直接执行任务或者从任务队列中获取任务,相当于消费之消费元素。
1 public void execute(runnable command) { 2 //(1)首先检查任务是否为null,为null抛出异常,否则进行下面的步骤 3 if (command == null) 4 throw new nullpointerexception(); 5 //(2)ctl值中包含了当前线程池的状态和线程池中的线程数量 6 int c = ctl.get(); 7 //(3)workercountof方法是获取低29位,即获取当前线程池中的线程个数,如果小于corepoolsize,就开启新的线程运行 8 if (workercountof(c) < corepoolsize) { 9 if (addworker(command, true)) 10 return; 11 c = ctl.get(); 12 } 13 //(4)如果线程池处理running状态,就添加任务到阻塞队列中 14 if (isrunning(c) && workqueue.offer(command)) { 15 //(4-1)二次检查,获取ctl值 16 int recheck = ctl.get(); 17 //(4-2)如果当前线程池不是出于running状态,就从队列中删除任务,并执行拒绝策略 18 if (! isrunning(recheck) && remove(command)) 19 reject(command); 20 //(4-3)否则,如果线程池为空,就添加一个线程 21 else if (workercountof(recheck) == 0) 22 addworker(null, false); 23 } 24 //(5)如果队列满,则新增线程,如果新增线程失败,就执行拒绝策略 25 else if (!addworker(command, false)) 26 reject(command); 27 }
我们在看一下上面代码的执行流程,按照标记的数字进行分析:
①步骤(3)判断当前线程池中的线程个数是否小于corepoolsize,如果小于核心线程数,会向workers里面新增一个核心线程执行任务。
②如果当前线程池中的线程数量大于核心线程数,就执行(4)。(4)首先判断当前线程池是否处于running状态,如果处于该状态,就添加任务到任务队列中,这里需要判断线程池的状态是因为线程池可能已经处于非running状态,而在非running状态下是需要抛弃新任务的。
③如果想任务队列中添加任务成功,需要进行二次校验,因为在添加任务到任务队列后,可能线程池的状态发生了变化,所以这里需要进行二次校验,如果当前线程池已经不是running状态了,需要将任务从任务队列中移除,然后执行拒绝策略;如果二次校验通过,则执行4-3代码重新判断当前线程池是否为空,如果线程池为空没有线程,那么就需要新创建一个线程。
④如果上面的步骤(4)创建添加任务失败,说明队列已满,那么(5)会尝试再开启新的线程执行任务(类比上图中的thread3和thread4,即不是核心线程的那些线程),如果当前线程池中的线程个数已经大于最大线程数maximumpoolsize,表示不能开启新的线程。这就属于线程池满并且任务队列满,就需要执行拒绝策略了。
下面我们在看看addworker方法的实现
1 private boolean addworker(runnable firsttask, boolean core) { 2 retry: 3 for (;;) { 4 int c = ctl.get(); 5 int rs = runstateof(c); 6 7 //(6)检查队列是否只在必要时候为空 8 if (rs >= shutdown && 9 ! (rs == shutdown && 10 firsttask == null && 11 ! workqueue.isempty())) 12 return false; 13 14 //(7)使用cas增加线程个数 15 for (;;) { 16 //根据ctl值获得当前线程池中的线程数量 17 int wc = workercountof(c); 18 //(7-1)如果线程数量超出限制,返回false 19 if (wc >= capacity || 20 wc >= (core ? corepoolsize : maximumpoolsize)) 21 return false; 22 //(7-2)cas增加线程数量,同时只有一个线程可以成功 23 if (compareandincrementworkercount(c)) 24 break retry; 25 c = ctl.get(); // 重新读取ctl值 26 //(7-3)cas失败了,需要查看当前线程池状态是否发生变化,如果发生变化需要跳转到外层循环尝试重新获取线程池状态,否则内层循环重新进行cas增加线程数量 27 if (runstateof(c) != rs) 28 continue retry; 29 } 30 } 31 32 //(8)执行到这里说明cas增加新线程个数成功了,我们需要开始创建新的工作线程worker 33 boolean workerstarted = false; 34 boolean workeradded = false; 35 worker w = null; 36 try { 37 //(8-1)创建新的worker 38 w = new worker(firsttask); 39 final thread t = w.thread; 40 if (t != null) { 41 final reentrantlock mainlock = this.mainlock; 42 //(8-2)加独占锁,保证workers的同步,可能线程池中的多个线程调用了线程池的execute方法 43 mainlock.lock(); 44 try { 45 // (8-3)重新检查线程池状态,以免在获取锁之前调用shutdown方法改变线程池状态 46 int rs = runstateof(ctl.get()); 47 48 if (rs < shutdown || 49 (rs == shutdown && firsttask == null)) { 50 if (t.isalive()) // precheck that t is startable 51 throw new illegalthreadstateexception(); 52 //(8-4)添加新任务 53 workers.add(w); 54 int s = workers.size(); 55 if (s > largestpoolsize) 56 largestpoolsize = s; 57 workeradded = true; 58 } 59 } finally { 60 mainlock.unlock(); 61 } 62 //(8-6)添加新任务成功之后,启动任务 63 if (workeradded) { 64 t.start(); 65 workerstarted = true; 66 } 67 } 68 } finally { 69 if (! workerstarted) 70 addworkerfailed(w); 71 } 72 return workerstarted; 73 }
简单再分析说明一下上面的代码,addworker方法主要分为两部分,第一部分是使用cas线程安全的添加线程数量,第二部分则是创建新的线程并且将任务并发安全的添加到新的workers之中,然后启动线程执行。
①代码(6)中检查队列是否只在必要时候为空,只有线程池状态符合条件才能够进行下面的步骤,从(6)中的判断条件来看,下面的集中情况addworker会直接返回false
( i )当前线程池状态为stop,tidying或者terminated ; (i i)当前线程池状态为shutdown并且已经有了第一个任务; (i i i)当前线程池状态为shutdown并且任务队列为空
②外层循环中判断条件通过之后,在内层循环中使用cas增加线程数,当cas成功就退出双重循环进行(8)步骤代码的执行,如果失败需要查看当前线程池的状态是否发生变化,如果发生变化需要进行外层循环重新判断线程池状态然后在进入内层循环重新进行cas增加线程数,如果线程池状态没有发生变化但是上一次cas失败就继续进行cas尝试。
③执行到(8)代码处,表明当前已经成功增加 了线程数,但是还没有线程执行任务。threadpoolexecutor中使用全局独占锁mainlock来控制将新增的工作线程worker线程安全的添加到工作者线程集合workers中。
④(8-2)获取了独占锁,但是在获取到锁之后,还需要进行重新检查线程池的状态,这是为了避免在获取全局独占锁之前其他线程调用了shutdown方法关闭了线程池。如果线程池已经关闭需要释放锁。否则将新增的线程添加到工作集合中,释放锁启动线程执行任务。
上面的addworker方法最后几行中,会判断添加工作线程是否成功,如果失败,会执行addworkerfailed方法,将任务从workers中移除,并且workercount做-1操作。
1 private void addworkerfailed(worker w) { 2 final reentrantlock mainlock = this.mainlock; 3 //获取锁 4 mainlock.lock(); 5 try { 6 //如果worker不为null 7 if (w != null) 8 //workers移除worker 9 workers.remove(w); 10 //通过cas操作,workercount-1 11 decrementworkercount(); 12 tryterminate(); 13 } finally { 14 //释放锁 15 mainlock.unlock(); 16 } 17 }
四、工作线程worker的执行
(1)工作线程worker类源码分析
上面查看addworker方法在cas更新线程数成功之后,下面就是创建新的worker线程执行任务,所以我们这里先查看worker类,下面是worker类的源码,我们可以看出,worker类继承了aqs并实现了runnable接口,所以他既是一个自定义的同步组件,也是一个执行任务的线程类。下面我们分析worker类的执行
1 private final class worker 2 extends abstractqueuedsynchronizer 3 implements runnable 4 { 5 6 /** 使用线程工厂创建的线程,执行任务 */ 7 final thread thread; 8 /** 初始化执行任务 */ 9 runnable firsttask; 10 /** 计数 */ 11 volatile long completedtasks; 12 13 /** 14 * 给出初始firsttask,线程创建工厂创建新的线程 15 */ 16 worker(runnable firsttask) { 17 setstate(-1); // 防止在调用runworker之前被中断 18 this.firsttask = firsttask; 19 this.thread = getthreadfactory().newthread(this); //使用threadfactory创建线程 20 } 21 22 /** run方法实际上执行的是runworker方法 */ 23 public void run() { 24 runworker(this); 25 } 26 27 // 关于同步状态(锁) 28 // 29 // 同步状态state=0表示锁未被获取 30 // 同步状态state=1表示锁被获取 31 32 protected boolean isheldexclusively() { 33 return getstate() != 0; 34 } 35 36 //下面都是重写aqs的方法,worker为自定义的同步组件 37 protected boolean tryacquire(int unused) { 38 if (compareandsetstate(0, 1)) { 39 setexclusiveownerthread(thread.currentthread()); 40 return true; 41 } 42 return false; 43 } 44 45 protected boolean tryrelease(int unused) { 46 setexclusiveownerthread(null); 47 setstate(0); 48 return true; 49 } 50 51 public void lock() { acquire(1); } 52 public boolean trylock() { return tryacquire(1); } 53 public void unlock() { release(1); } 54 public boolean islocked() { return isheldexclusively(); } 55 56 void interruptifstarted() { 57 thread t; 58 if (getstate() >= 0 && (t = thread) != null && !t.isinterrupted()) { 59 try { 60 t.interrupt(); 61 } catch (securityexception ignore) { 62 } 63 } 64 } 65 }
在构造函数中我们可以看出,首先将同步状态state置为-1,而worker这个同步组件的state有三个值,其中state=-1表示work创建时候的默认状态,创建时候设置state=-1是为了防止runworker方法运行前被中断 前面说到过这个结论,这里置为-1是为了避免当前worker在调用runworker方法之前被中断(当其他线程调用线程池的shutdownnow时候,如果worker的state>=0则会中断线程),设置为-1就不会被中断了。而worker实现runnable接口,那么需要重写run方法,在run方法中,我们可以看到,实际上执行的是runworker方法,在runworker方法中,会首先调用unlock方法,该方法会将state置为0,所以这个时候调用shutdownnow方法就会中断当前线程,而这个时候已经进入了runwork方法了,就不会在还没有执行runworker方法的时候就中断线程。
(2)runworker方法的源码分析
1 final void runworker(worker w) { 2 thread wt = thread.currentthread(); 3 runnable task = w.firsttask; 4 w.firsttask = null; 5 w.unlock(); // 这个时候调用unlock方法,将state置为0,就可以被中断了 6 boolean completedabruptly = true; 7 try { 8 //(10)如果当前任务为null,或者从任务队列中获取到的任务为null,就跳转到(11)处执行清理工作 9 while (task != null || (task = gettask()) != null) { 10 //task不为null,就需要线程执行任务,这个时候,需要获取工作线程内部持有的独占锁 11 w.lock(); 12 /**如果线程池已被停止(stop)(至少大于stop状态),要确保线程都被中断 13 * 如果状态不对,检查当前线程是否中断并清除中断状态,并且再次检查线程池状态是否大于stop 14 * 如果上述满足,检查该对象是否处于中断状态,不清除中断标记 15 */ 16 if ((runstateatleast(ctl.get(), stop) || 17 (thread.interrupted() && 18 runstateatleast(ctl.get(), stop))) && 19 !wt.isinterrupted()) 20 //中断该对象 21 wt.interrupt(); 22 try { 23 //执行任务之前要做的事情 24 beforeexecute(wt, task); 25 throwable thrown = null; 26 try { 27 task.run(); //执行任务 28 } catch (runtimeexception x) { 29 thrown = x; throw x; 30 } catch (error x) { 31 thrown = x; throw x; 32 } catch (throwable x) { 33 thrown = x; throw new error(x); 34 } finally { 35 //执行任务之后的方法 36 afterexecute(task, thrown); 37 } 38 } finally { 39 task = null; 40 //更新当前已完成任务数量 41 w.completedtasks++; 42 //释放锁 43 w.unlock(); 44 } 45 } 46 completedabruptly = false; 47 } finally { 48 //执行清理工作:处理并退出当前worker 49 processworkerexit(w, completedabruptly); 50 } 51 }
我们梳理一下runworker方法的执行流程
①首先先执行unlock方法,将worker的state置为0,这样工作线程就可以被中断了(后续的操作如果线程池关闭就需要线程被中断)
②首先判断判断当前的任务(当前工作线程中的task,或者从任务队列中取出的task)是否为null,如果不为null就往下执行,为null就执行processworkerexit方法。
③获取工作线程内部持有的独占锁(避免在执行任务期间,其他线程调用shutdown后正在执行的任务被中断,shutdown只会中断当前被阻塞挂起的没有执行任务的线程)
④然后执行beforeexecute()方法,该方法为扩展接口代码,表示在具体执行任务之前所做的一些事情,然后执行task.run()方法执行具体任务,执行完之后会调用afterexecute()方法,用以处理任务执行完毕之后的工作,也是一个扩展接口代码。
⑤更新当前线程池完成的任务数,并释放锁
(3)执行清理工作的方法processworkerexit
下面是方法processworkerexit的源码,在下面的代码中
①首先(1-1)处统计线程池完成的任务个数,并且在此之前获取全局锁,然后更新当前的全局计数器,然后从工作线程集合中移除当前工作线程,完成清理工作。
②代码(1-2)调用了tryterminate 方法,在该方法中,判断了当前线程池状态是shutdown并且队列不为空或者当前线程池状态为stop并且当前线程池中没有活动线程,则置线程池状态为terminated。如果设置称为了terminated状态,还需要调用全局条件变量termination的signalall方法唤醒所有因为调用线程池的awaittermination方法而被阻塞住的线程,使得线程池中的所有线程都停止,从而使得线程池为terminated状态。
③代码(1-3)处判断当前线程池中的线程个数是否小于核心线程数,如果是,需要新增一个线程保证有足够的线程可以执行任务队列中的任务或者提交的任务。
1 private void processworkerexit(worker w, boolean completedabruptly) { 2 /* 3 *completedabruptly:是由runworker传过来的参数,表示是否突然完成的意思 4 *当在就是在执行任务过程当中出现异常,就会突然完成,传true 5 * 6 *如果是突然完成,需要通过cas操作,更新workercount(-1操作) 7 *不是突然完成,则不需要-1,因为gettask方法当中已经-1(gettask方法中执行了decrementworkercount()方法) 8 */ 9 if (completedabruptly) 10 decrementworkercount(); 11 //(1-1)在统计完成任务个数之前加上全局锁,然后统计线程池中完成的任务个数并更新全局计数器,并从工作集中删除当前worker 12 final reentrantlock mainlock = this.mainlock; 13 mainlock.lock(); //获得全局锁 14 try { 15 completedtaskcount += w.completedtasks; //更新已完成的任务数量 16 workers.remove(w); //将完成该任务的线程worker从工作线程集合中移除 17 } finally { 18 mainlock.unlock(); //释放锁 19 } 20 /**(1-2) 21 * 这一个方法调用完成了下面的事情: 22 * 判断如果当前线程池状态是shutdown并且工作队列为空, 23 * 或者当前线程池状态是stop并且当前线程池里面没有活动线程, 24 * 则设置当前线程池状态为terminated,如果设置成了terminated状态, 25 * 还需要调用条件变量termination的signall方法激活所有因为调用线程池的awaittermination方法而被阻塞的线程 26 */ 27 tryterminate(); 28 29 //(1-3)如果当前线程池中线程数小于核心线程,则增加核心线程数 30 int c = ctl.get(); 31 //判断当前线程池的状态是否小于stop(running或者shutdown) 32 if (runstatelessthan(c, stop)) { 33 //如果任务忽然完成,执行后续的代码 34 if (!completedabruptly) { 35 //allowcorethreadtimeout表示是否允许核心线程超时,默认为false 36 //min这里当默认为allowcorethreadtimeout默认为false的时候,min置为coorpoolsize 37 int min = allowcorethreadtimeout ? 0 : corepoolsize; 38 //这里说明:如果允许核心线程超时,那么allowcorethreadtimeout可为true,那么min值为0,不需要维护核心线程了 39 //如果min为0并且任务队列不为空 40 if (min == 0 && ! workqueue.isempty()) 41 min = 1; //这里表示如果min为0,且队列不为空,那么至少需要一个核心线程存活来保证任务的执行 42 //如果工作线程数大于min,表示当前线程数满足,直接返回 43 if (workercountof(c) >= min) 44 return; // replacement not needed 45 } 46 addworker(null, false); 47 } 48 }
在tryterminate 方法中,我们简单说明了该方法的作用,下面是该方法的源码,可以看出源码实现上和上面所总结的功能是差不多的
1 final void tryterminate() { 2 for (;;) { 3 //获取线程池状态 4 int c = ctl.get(); 5 //如果线程池状态为running 6 //或者状态大于tidying 7 //或者状态==shutdown并未任务队列不为空 8 //直接返回,不能调用terminated方法 9 if (isrunning(c) || 10 runstateatleast(c, tidying) || 11 (runstateof(c) == shutdown && ! workqueue.isempty())) 12 return; 13 //如果线程池中工作线程数不为0,需要中断线程 14 if (workercountof(c) != 0) { // eligible to terminate 15 interruptidleworkers(only_one); 16 return; 17 } 18 //获得线程池的全局锁 19 final reentrantlock mainlock = this.mainlock; 20 mainlock.lock(); 21 try { 22 //通过cas操作,将线程池状态设置为tidying 23 if (ctl.compareandset(c, ctlof(tidying, 0))) { //private static int ctlof(int rs, int wc) { return rs | wc; } 24 try { 25 //调用terminated方法 26 terminated(); 27 } finally { 28 //最终将线程状态设置为terminated 29 ctl.set(ctlof(terminated, 0)); 30 //调用条件变量termination的signaall方法唤醒所有因为 31 //调用线程池的awaittermination方法而被阻塞的线程 32 //private final condition termination = mainlock.newcondition(); 33 termination.signalall(); 34 } 35 return; 36 } 37 } finally { 38 mainlock.unlock(); 39 } 40 // else retry on failed cas 41 } 42 }
五、补充(shutdown、shutdownnow、awaittermination方法)
(1)shutdown操作
我们在使用线程池的时候知道,调用shutdown方法之后线程池就不会再接受新的任务了,但是任务队列中的任务还是需要执行完的。调用该方法会立刻返回,并不是等到线程池的任务队列中的所有任务执行完毕在返回的。
1 public void shutdown() { 2 //获得线程池的全局锁 3 final reentrantlock mainlock = this.mainlock; 4 mainlock.lock(); 5 try { 6 //进行权限检查 7 checkshutdownaccess(); 8 9 //设置当前线程池的状态的shutdown,如果线程池状态已经是该状态就会直接返回,下面我们会分析这个方法的源码 10 advancerunstate(shutdown); 11 12 //设置中断 标志 13 interruptidleworkers(); 14 onshutdown(); // hook for scheduledthreadpoolexecutor 15 } finally { 16 mainlock.unlock(); 17 } 18 //尝试将状态变为terminated,上面已经分析过该方法的源码 19 tryterminate(); 20 }
该方法的源码比较简短,首先检查了安全管理器,是查看当前调用shutdown命令的线程是否有关闭线程的权限,如果有权限还需要看调用线程是否有中断工作线程的权限,如果没有权限将会抛出securityexception异常或者空指针异常。下面我们查看一下advancerunstate 方法的源码。
1 private void advancerunstate(int targetstate) { 2 for (;;) { 3 //下面的方法执行的就是: 4 //首先获取线程的ctl值,然后判断当前线程池的状态如果已经是shutdown,那么if条件第一个为真就直接返回 5 //如果不是shutdown状态,就需要cas的设置当前状态为shutdown 6 int c = ctl.get(); 7 if (runstateatleast(c, targetstate) || 8 //private static int ctlof(int rs, int wc) { return rs | wc; } 9 ctl.compareandset(c, ctlof(targetstate, workercountof(c)))) 10 break; 11 } 12 }
我们可以看出advancerunstate 方法实际上就是判断当前线程池的状态是否为shutdwon,如果是那么就返回,否则就需要设置当前状态为shutdown。
我们再来看看shutdown方法中调用线程中断的方法interruptidleworkers源码
1 private void interruptidleworkers() { 2 interruptidleworkers(false); 3 } 4 private void interruptidleworkers(boolean onlyone) { 5 final reentrantlock mainlock = this.mainlock; 6 mainlock.lock(); 7 try { 8 for (worker w : workers) { 9 thread t = w.thread; 10 //如果工作线程没有被中断,并且没有正在运行设置中断标志 11 if (!t.isinterrupted() && w.trylock()) { 12 try { 13 //需要中断当前线程 14 t.interrupt(); 15 } catch (securityexception ignore) { 16 } finally { 17 w.unlock(); 18 } 19 } 20 if (onlyone) 21 break; 22 } 23 } finally { 24 mainlock.unlock(); 25 } 26 }
上面的代码中,需要设置所有空闲线程的中断标志。首先获取线程池的全局锁,同时只有一个线程可以调用shutdown方法设置中断标志。然后尝试获取工作线程worker自己的锁,获取成功则可以设置中断标志(这是由于正在执行任务的线程需要获取自己的锁,并且不可重入,所以正在执行的任务没有被中断),这里要中断的那些线程是阻塞到gettask()方法并尝试从任务队列中获取任务的线程即空闲线程。
(2)shutdownnow操作
在使用线程池的时候,如果我们调用了shutdownnow方法,线程池不仅不会再接受新的任务,还会将任务队列中的任务丢弃,正在执行的任务也会被中断,然后立刻返回该方法,不会等待激活的任务完成,返回值为当前任务队列中被丢弃的任务列表
1 public list<runnable> shutdownnow() { 2 list<runnable> tasks; 3 final reentrantlock mainlock = this.mainlock; 4 mainlock.lock(); 5 try { 6 checkshutdownaccess(); //还是进行权限检查 7 advancerunstate(stop); //设置线程池状态台stop 8 interruptworkers(); //中断所有线程 9 tasks = drainqueue(); //将任务队列中的任务移动到task中 10 } finally { 11 mainlock.unlock(); 12 } 13 tryterminate(); 14 return tasks; //返回tasks 15 }
从上面的代码中,我们可以可以发现,shutdownnow方法也是首先需要检查调用该方法的线程的权限,之后不同于shutdown方法之处在于需要即刻设置当前线程池状态为stop,然后中断所有线程(空闲线程+正在执行任务的线程),移除任务队列中的任务
1 private void interruptworkers() { 2 final reentrantlock mainlock = this.mainlock; 3 mainlock.lock(); 4 try { 5 for (worker w : workers) //不需要判断当前线程是否在执行任务(即不需要调用w.trylock方法),中断所有线程 6 w.interruptifstarted(); 7 } finally { 8 mainlock.unlock(); 9 } 10 }
(3)awaittermination操作
当线程调用该方法之后,会阻塞调用者线程,直到线程池状态为terminated状态才会返回,或者等到超时时间到之后会返回,下面是该方法的源码。
1 //调用该方法之后,会阻塞调用者线程,直到线程池状态为terminated状态才会返回,或者等到超时时间到之后会返回 2 public boolean awaittermination(long timeout, timeunit unit) 3 throws interruptedexception { 4 long nanos = unit.tonanos(timeout); 5 final reentrantlock mainlock = this.mainlock; 6 mainlock.lock(); 7 try { 8 //阻塞当前线程,(获取了worker自己的锁),那么当前线程就不会再执行任务(因为获取不到锁) 9 for (;;) { 10 //当前线程池状态为terminated状态,会返回true 11 if (runstateatleast(ctl.get(), terminated)) 12 return true; 13 //超时时间到返回false 14 if (nanos <= 0) 15 return false; 16 nanos = termination.awaitnanos(nanos); 17 } 18 } finally { 19 mainlock.unlock(); 20 } 21 }
在上面的代码中,调用者线程需要首先获取线程worker 自己的独占锁,然后在循环判断当前线程池是否已经是terminated状态,如果是则直接返回,否则说明当前线程池中还有线程正在执行任务,这时候需要查看当前设置的超时时间是否小于0,小于0说明不需要等待就直接返回,如果大于0就需要调用条件变量termination的awaitnanos方法等待设置的时间,并在这段时间之内等待线程池的状态变为terminated。
我们在前面说到清理线程池的方法processworkerexit的时候,需要调用tryterminated方法,在该方法中会查看当前线程池状态是否为terminated,如果是该状态也会调用termination.signalall()方法唤醒所有线程池中因调用awaittermination而被阻塞住的线程。
如果是设置了超时时间,那么termination的awaitnanos方法也会返回,这时候需要重新检查线程池状态是否为terminated,如果是则返回,不是就继续阻塞自己。
推荐阅读
-
Java并发编程:Java线程池核心ThreadPoolExecutor的使用和原理分析
-
Java并发包中线程池ThreadPoolExecutor原理探究
-
Java线程池实现原理及其在多接口聚合业务中的实践
-
JAVA中ThreadPoolExecutor线程池的submit方法详解
-
JAVA 之ThreadPoolExecutor线程池原理及其execute方法实例详解
-
JAVA 之ThreadPoolExecutor线程池原理及其execute方法实例详解
-
Java中关于线程池使用和原理的详解
-
Java中关于线程池使用和原理的详解
-
Java并发编程:Java线程池核心ThreadPoolExecutor的使用和原理分析
-
JAVA中ThreadPoolExecutor线程池的submit方法详解