Java并发系列[10]----ThreadPoolExecutor源码分析
在日常的开发调试中,我们经常会直接new一个Thread对象来执行某个任务。这种方式在任务数较少的情况下比较简单实用,但是在并发量较大的场景中却有着致命的缺陷。例如在访问量巨大的网站中,如果每个请求都开启一个线程来处理的话,即使是再强大的服务器也支撑不住。一台电脑的CPU资源是有限的,在CPU较为空闲的情况下,新增线程可以提高CPU的利用率,达到提升性能的效果。但是在CPU满载运行的情况下,再继续增加线程不仅不能提升性能,反而因为线程的竞争加大而导致性能下降,甚至导致服务器宕机。因此,在这种情况下我们可以利用线程池来使线程数保持在合理的范围内,使得CPU资源被充分的利用,且避免因过载而导致宕机的危险。在Executors中为我们提供了多种静态工厂方法来创建各种特性的线程池,其中大多数是返回ThreadPoolExecutor对象。因此本篇我们从ThreadPoolExecutor类着手,深入探究线程池的实现机制。
1. 线程池状态和线程数的表示
1 //高3位表示线程池状态, 后29位表示线程个数 2 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); 3 private static final int COUNT_BITS = Integer.SIZE - 3; 4 private static final int CAPACITY = (1 << COUNT_BITS) - 1; 5 6 //运行状态 例:11100000000000000000000000000000 7 private static final int RUNNING = -1 << COUNT_BITS; 8 9 //关闭状态 例:00000000000000000000000000000000 10 private static final int SHUTDOWN = 0 << COUNT_BITS; 11 12 //停止状态 例:00100000000000000000000000000000 13 private static final int STOP = 1 << COUNT_BITS; 14 15 //整理状态 例:01000000000000000000000000000000 16 private static final int TIDYING = 2 << COUNT_BITS; 17 18 //终止状态 例:01100000000000000000000000000000 19 private static final int TERMINATED = 3 << COUNT_BITS; 20 21 private static int runStateOf(int c) { return c & ~CAPACITY; } 22 private static int workerCountOf(int c) { return c & CAPACITY; } 23 private static int ctlOf(int rs, int wc) { return rs | wc; }
在继续接下来的探究之前,我们先来搞清楚ThreadPoolExecutor是怎样存放状态信息和线程数信息的。ThreadPoolExecutor利用原子变量ctl来同时存储运行状态和线程数的信息,其中高3位表示线程池的运行状态(runState),后面的29位表示线程池中的线程数(workerCount)。上面代码中,runStateOf方法是从ctl取出状态信息,workerCountOf方法是从ctl取出线程数信息,ctlOf方法是将状态信息和线程数信息糅合进ctl中。具体的计算过程如下图所示。
2. 线程池各个状态的具体含义
就像人的生老病死一样,线程池也有自己的生命周期,从创建到终止,线程池在每个阶段所做的事情是不一样的。新建一个线程池时它的状态为Running,这时它不断的从外部接收并处理任务,当处理不过来时它会把任务放到任务队列中;之后我们可能会调用shutdown()来终止线程池,这时线程池的状态从Running转为Shutdown,它开始拒绝接收从外部传过来的任务,但是会继续处理完任务队列中的任务;我们也可能调用shutdownNow()来立刻停止线程池,这时线程池的状态从Running转为Stop,然后它会快速排空任务队列中的任务并转到Tidying状态,处于该状态的线程池需要执行terminated()来做相关的扫尾工作,执行完terminated()之后线程池就转为Terminated状态,表示线程池已终止。这些状态的转换图如下所示。
3. 关键成员变量的介绍
1 //任务队列 2 private final BlockingQueue<Runnable> workQueue; 3 4 //工作者集合 5 private final HashSet<Worker> workers = new HashSet<Worker>(); 6 7 //线程达到的最大值 8 private int largestPoolSize; 9 10 //已完成任务总数 11 private long completedTaskCount; 12 13 //线程工厂 14 private volatile ThreadFactory threadFactory; 15 16 //拒绝策略 17 private volatile RejectedExecutionHandler handler; 18 19 //闲置线程存活时间 20 private volatile long keepAliveTime; 21 22 //是否允许核心线程超时 23 private volatile boolean allowCoreThreadTimeOut; 24 25 //核心线程数量 26 private volatile int corePoolSize; 27 28 //最大线程数量 29 private volatile int maximumPoolSize; 30 31 //默认拒绝策略 32 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
在深入探究线程池的实现机制之前,我们有必要了解一下各个成员变量的作用。上面列出了主要的成员变量,除了一些用于统计的变量,例如largestPoolSize和completedTaskCount,其中大部分变量的值都是可以在构造时进行设置的。下面我们就看一下它的核心构造器。
1 //核心构造器 2 public ThreadPoolExecutor(int corePoolSize, 3 int maximumPoolSize, 4 long keepAliveTime, 5 TimeUnit unit, 6 BlockingQueue<Runnable> workQueue, 7 ThreadFactory threadFactory, 8 RejectedExecutionHandler handler) { 9 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) { 10 throw new IllegalArgumentException(); 11 } 12 if (workQueue == null || threadFactory == null || handler == null) { 13 throw new NullPointerException(); 14 } 15 this.corePoolSize = corePoolSize; //设置核心线程数量 16 this.maximumPoolSize = maximumPoolSize; //设置最大线程数量 17 this.workQueue = workQueue; //设置任务队列 18 this.keepAliveTime = unit.toNanos(keepAliveTime); //设置存活时间 19 this.threadFactory = threadFactory; //设置线程工厂 20 this.handler = handler; //设置拒绝策略 21 }
ThreadPoolExecutor有多个构造器,所有的构造器都会调用上面的核心构造器。通过核心构造器我们可以为线程池设置不同的参数,由此线程池也能表现出不同的特性。因此彻底搞懂这几个参数的含义能使我们更好的使用线程池,下面我们就来详细看一下这几个参数的含义。
corePoolSize:
核心线程数最大值,默认情况下新建线程池时并不创建线程,后续每接收一个任务就新建一个核心线程来处理,直到核心线程数达到corePoolSize。这时后面到来的任务都会被放到任务队列中等待。
maximumPoolSize:
总线程数最大值,当任务队列被放满了之后,将会新建非核心线程来处理后面到来的任务。当总的线程数达到maximumPoolSize后,将不再继续创建线程,而是对后面的任务执行拒绝策略。
workQueue:
任务队列,当核心线程数达到corePoolSize后,后面到来的任务都会被放到任务队列中,该任务队列是阻塞队列,工作线程可以通过定时或者阻塞方式从任务队列中获取任务。
keepAliveTime:
闲置线程存活时间,该参数默认情况下只在线程数大于corePoolSize时起作用,闲置线程在任务队列上等待keepAliveTime时间后将会被终止,直到线程数减至corePoolSize。也可以通过设置allowCoreThreadTimeOut变量为true来使得keepAliveTime在任何时候都起作用,这时线程数最后会减至0。
4. execute方法的执行过程
1 //核心执行方法 2 public void execute(Runnable command) { 3 if (command == null) throw new NullPointerException(); 4 int c = ctl.get(); 5 //线程数若小于corePoolSize则新建核心工作者 6 if (workerCountOf(c) < corePoolSize) { 7 if (addWorker(command, true)) return; 8 c = ctl.get(); 9 } 10 //否则将任务放到任务队列 11 if (isRunning(c) && workQueue.offer(command)) { 12 int recheck = ctl.get(); 13 //若不是running状态则将该任务从队列中移除 14 if (!isRunning(recheck) && remove(command)) { 15 //成功移除后再执行拒绝策略 16 reject(command); 17 //若线程数为0则新增一个非核心线程 18 }else if (workerCountOf(recheck) == 0) { 19 addWorker(null, false); 20 } 21 //若队列已满则新增非核心工作者 22 }else if (!addWorker(command, false)) { 23 //若新建非核心线程失败则执行拒绝策略 24 reject(command); 25 } 26 }
execute方法是线程池接收任务的入口方法,当创建好一个线程池之后,我们会调用execute方法并传入一个Runnable交给线程池去执行。从上面代码中可以看到execute方法首先会去判断当前线程数是否小于corePoolSize,如果小于则调用addWorker方法新建一个核心线程去处理该任务,否则调用workQueue的offer方法将该任务放入到任务队列中。通过offer方法添加并不会阻塞线程,如果添加成功会返回true,若队列已满则返回false。在成功将任务放入到任务队列后,还会再次检查线程池是否是Running状态,如果不是则将刚刚添加的任务从队列中移除,然后再执行拒绝策略。如果从队列中移除任务失败,则再检查一下线程数是否为0(有可能刚好全部线程都被终止了),是的话就新建一个非核心线程去处理。如果任务队列已经满了,此时offer方法会返回false,接下来会再次调用addWorker方法新增一个非核心线程来处理该任务。如果期间创建线程失败,则最后会执行拒绝策略。
5. 工作线程的实现
1 //工作者类 2 private final class Worker extends AbstractQueuedSynchronizer implements Runnable { 3 //关联线程 4 final Thread thread; 5 //初始任务 6 Runnable firstTask; 7 //完成任务数 8 volatile long completedTasks; 9 10 //构造器 11 Worker(Runnable firstTask) { 12 //抑制中断直到runWorker 13 setState(-1); 14 //设置初始任务 15 this.firstTask = firstTask; 16 //设置关联线程 17 this.thread = getThreadFactory().newThread(this); 18 } 19 20 public void run() { 21 runWorker(this); 22 } 23 24 //判断是否占有锁, 0代表未占用, 1代表已占用 25 protected boolean isHeldExclusively() { 26 return getState() != 0; 27 } 28 29 //尝试获取锁 30 protected boolean tryAcquire(int unused) { 31 if (compareAndSetState(0, 1)) { 32 setExclusiveOwnerThread(Thread.currentThread()); 33 return true; 34 } 35 return false; 36 } 37 38 //尝试释放锁 39 protected boolean tryRelease(int unused) { 40 setExclusiveOwnerThread(null); 41 setState(0); 42 return true; 43 } 44 45 public void lock() { acquire(1); } 46 public boolean tryLock() { return tryAcquire(1); } 47 public void unlock() { release(1); } 48 public boolean isLocked() { return isHeldExclusively(); } 49 50 //中断关联线程 51 void interruptIfStarted() { 52 Thread t; 53 //将活动线程和闲置线程都中断 54 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { 55 try { 56 t.interrupt(); 57 } catch (SecurityException ignore) { 58 //ignore 59 } 60 } 61 } 62 }
ThreadPoolExecutor内部实现了一个Worker类,用它来表示工作线程。每个Worker对象都持有一个关联线程和分配给它的初始任务。Worker类继承自AQS并实现了自己的加锁解锁方法,说明每个Worker对象也是一个锁对象。同时Worker类还实现了Runnable接口,因此每个Worker对象都是可以运行的。Worker类有一个唯一的构造器,需要传入一个初始任务给它,在构造器里面首先将同步状态设置为-1,这个操作主要是抑制中断直到runWorker方法运行,为啥要这样做?我们继续看下去,可以看到在设置完初始任务之后,马上就开始设置关联线程,关联线程是通过线程工厂的newThread方法来生成的,这时将Worker对象本身当作任务传给关联线程。因此在启动关联线程时(调用start方法),会运行Worker对象自身的run方法。而run方法里面紧接着调用runWorker方法,也就是说只有在runWorker方法运行时才表明关联线程已启动,这时去中断关联线程才有意义,因此前面要通过设置同步状态为-1来抑制中断。那么为啥将同步状态设置为-1就可以抑制中断?每个Worker对象都是通过调用interruptIfStarted方法来中断关联线程的,在interruptIfStarted方法内部会判断只有同步状态>=0时才会中断关联线程。因此将同步状态设置为-1能起到抑制中断的作用。
6. 工作线程的创建
1 //添加工作线程 2 private boolean addWorker(Runnable firstTask, boolean core) { 3 retry: 4 for (;;) { 5 int c = ctl.get(); 6 int rs = runStateOf(c); 7 //只有以下两种情况会继续添加线程 8 //1.状态为running 9 //2.状态为shutdown,首要任务为空,但任务队列中还有任务 10 if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) { 11 return false; 12 } 13 for (;;) { 14 int wc = workerCountOf(c); 15 //以下三种情况不继续添加线程: 16 //1.线程数大于线程池总容量 17 //2.当前线程为核心线程,且核心线程数达到corePoolSize 18 //3.当前线程非核心线程,且总线程达到maximumPoolSize 19 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) { 20 return false; 21 } 22 //否则继续添加线程, 先将线程数加一 23 if (compareAndIncrementWorkerCount(c)) { 24 //执行成功则跳过外循环 25 break retry; 26 } 27 //CAS操作失败再次检查线程池状态 28 c = ctl.get(); 29 //如果线程池状态改变则继续执行外循环 30 if (runStateOf(c) != rs) { 31 continue retry; 32 } 33 //否则表明CAS操作失败是workerCount改变, 继续执行内循环 34 } 35 } 36 boolean workerStarted = false; 37 boolean workerAdded = false; 38 Worker w = null; 39 try { 40 final ReentrantLock mainLock = this.mainLock; 41 w = new Worker(firstTask); 42 final Thread t = w.thread; 43 if (t != null) { 44 mainLock.lock(); 45 try { 46 int c = ctl.get(); 47 int rs = runStateOf(c); 48 if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { 49 //如果线程已经开启则抛出异常 50 if (t.isAlive()) throw new IllegalThreadStateException(); 51 //将工作者添加到集合中 52 workers.add(w); 53 int s = workers.size(); 54 //记录线程达到的最大值 55 if (s > largestPoolSize) { 56 largestPoolSize = s; 57 } 58 workerAdded = true; 59 } 60 } finally { 61 mainLock.unlock(); 62 } 63 //将工作者添加到集合后则启动线程 64 if (workerAdded) { 65 t.start(); 66 workerStarted = true; 67 } 68 } 69 } finally { 70 //如果线程启动失败则回滚操作 71 if (!workerStarted) { 72 addWorkerFailed(w); 73 } 74 } 75 return workerStarted; 76 }
上面我们知道在execute方法里面会调用addWorker方法来添加工作线程。通过代码可以看到进入addWorker方法里面会有两层自旋循环,在外层循环中获取线程池当前的状态,如果线程池状态不符合就直接return,在内层循环中获取线程数,如果线程数超过限定值也直接return。只有经过这两重判断之后才会使用CAS方式来将线程数加1。成功将线程数加1之后就跳出外层循环去执行后面的逻辑,否则就根据不同条件来进行自旋,如果是线程池状态改变就执行外层循环,如果是线程数改变就执行内层循环。当线程数成功加1之后,后面就是去新建一个Worker对象,并在构造时传入初始任务给它。然后将这个Worker对象添加到工作者集合当中,添加成功后就调用start方法来启动关联线程。
7. 工作线程的执行
1 //运行工作者 2 final void runWorker(Worker w) { 3 //获取当前工作线程 4 Thread wt = Thread.currentThread(); 5 //获取工作者的初始任务 6 Runnable task = w.firstTask; 7 //将工作者的初始任务置空 8 w.firstTask = null; 9 //将同步状态从-1设为0 10 w.unlock(); 11 boolean completedAbruptly = true; 12 try { 13 //初始任务不为空则执行初始任务, 否则从队列获取任务 14 while (task != null || (task = getTask()) != null) { 15 //确保获取到任务后才加锁 16 w.lock(); 17 //若状态大于等于stop, 保证当前线程被中断 18 //若状态小于stop, 保证当前线程未被中断 19 //在清理中断状态时可能有其他线程在修改, 所以会再检查一次 20 if ((runStateAtLeast(ctl.get(), STOP) || 21 (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) { 22 wt.interrupt(); 23 } 24 try { 25 //任务执行前做些事情 26 beforeExecute(wt, task); 27 Throwable thrown = null; 28 try { 29 //执行当前任务 30 task.run(); 31 } catch (RuntimeException x) { 32 thrown = x; throw x; 33 } catch (Error x) { 34 thrown = x; throw x; 35 } catch (Throwable x) { 36 thrown = x; throw new Error(x); 37 } finally { 38 //任务执行后做一些事情 39 afterExecute(task, thrown); 40 } 41 } finally { 42 //将执行完的任务置空 43 task = null; 44 //将完成的任务数加一 45 w.completedTasks++; 46 w.unlock(); 47 } 48 } 49 //设置该线程为正常完成任务 50 completedAbruptly = false; 51 } finally { 52 //执行完所有任务后将线程删除 53 processWorkerExit(w, completedAbruptly); 54 } 55 }
上面我们知道,将Worker对象添加到workers集合之后就会去调用关联线程的start方法,由于传给关联线程的Runnable就是Worker对象本身,因此会调用Worker对象实现的run方法,最后会调用到runWorker方法。我们看到上面代码,进入到runWorker方法里面首先获取了Worker对象的初始任务,然后调用unlock方法将同步状态加1,由于在构造Worker对象时将同步状态置为-1了,所以这里同步状态变回0,因此在这之后才可以调用interruptIfStarted方法来中断关联线程。如果初始任务不为空就先去执行初始任务,否则就调用getTask方法去任务队列中获取任务,可以看到这里是一个while循环,也就是说工作线程在执行完自己的任务之后会不断的从任务队列中获取任务,直到getTask方法返回null,然后工作线程退出while循环最后执行processWorkerExit方法来移除自己。如果需要在所有任务执行之前或之后做些处理,可以分别实现beforeExecute方法和afterExecute方法。
8. 任务的获取
1 //从任务队列中获取任务 2 private Runnable getTask() { 3 //上一次获取任务是否超时 4 boolean timedOut = false; 5 retry: 6 //在for循环里自旋 7 for (;;) { 8 int c = ctl.get(); 9 int rs = runStateOf(c); 10 //以下两种情况会将工作者数减为0并返回null,并直接使该线程终止: 11 //1.状态为shutdown并且任务队列为空 12 //2.状态为stop, tidying或terminated 13 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { 14 decrementWorkerCount(); 15 return null; 16 } 17 18 boolean timed; 19 //判断是否要剔除当前线程 20 for (;;) { 21 int wc = workerCountOf(c); 22 //以下两种情况会在限定时间获取任务: 23 //1.允许核心线程超时 24 //2.线程数大于corePoolSize 25 timed = allowCoreThreadTimeOut || wc > corePoolSize; 26 //以下两种情况不执行剔除操作: 27 //1.上次任务获取未超时 28 //2.上次任务获取超时, 但没要求在限定时间获取 29 if (wc <= maximumPoolSize && !(timedOut && timed)) { 30 break; 31 } 32 //若上次任务获取超时, 且规定在限定时间获取, 则将线程数减一 33 if (compareAndDecrementWorkerCount(c)) { 34 //CAS操作成功后直接返回null 35 return null; 36 } 37 //CAS操作失败后再次检查状态 38 c = ctl.get(); 39 //若状态改变就从外层循环重试 40 if (runStateOf(c) != rs) { 41 continue retry; 42 } 43 //否则表明是workerCount改变, 继续在内循环重试 44 } 45 46 try { 47 //若timed为true, 则在规定时间内返回 48 //若timed为false, 则阻塞直到获取成功 49 //注意:闲置线程会一直在这阻塞 50 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); 51 //获取任务不为空则返回该任务 52 if (r != null) { 53 return r; 54 } 55 //否则将超时标志设为true 56 timedOut = true; 57 } catch (InterruptedException retry) { 58 timedOut = false; 59 } 60 } 61 }
工作线程在while循环里不断的通过getTask方法来从任务队列中获取任务,我们看一下getTask方法是怎样获取任务的。进入第一个for循环之后有一个if判断,从这里我们可以看到,如果线程池状态为shutdown,会继续消费任务队列里面的任务;如果线程池状态为stop,则停止消费任务队列里剩余的任务。进入第二个for循环后会给timed变量赋值,由于allowCoreThreadTimeOut变量默认是false,所以timed的值取决于线程数是否大于corePoolSize,小于为false,大于则为true。从任务队列里面获取任务的操作在try块里面,如果timed为true,则调用poll方法进行定时获取;如果timed为flase,则调用take方法进行阻塞获取。也就是说默认情况下,如果线程数小于corePoolSize,则调用take方法进行阻塞获取,即使任务队列为空,工作线程也会一直等待;如果线程数大于corePoolSize,则调用poll方法进行定时获取,在keepAliveTime时间内获取不到任务则会返回null,对应的工作线程也会被移除,但线程数会保持在corePoolSize上。当然如果设置allowCoreThreadTimeOut为true,则会一直通过调用poll方法来从任务队列中获取任务,如果任务队列长时间为空,则工作线程会减少至0。
9. 工作线程的退出
1 //删除工作线程 2 private void processWorkerExit(Worker w, boolean completedAbruptly) { 3 //若非正常完成则将线程数减为0 4 if (completedAbruptly) { 5 decrementWorkerCount(); 6 } 7 final ReentrantLock mainLock = this.mainLock; 8 mainLock.lock(); 9 try { 10 //统计完成的任务总数 11 completedTaskCount += w.completedTasks; 12 //在这将工作线程移除 13 workers.remove(w); 14 } finally { 15 mainLock.unlock(); 16 } 17 //尝试终止线程池 18 tryTerminate(); 19 //再次检查线程池状态 20 int c = ctl.get(); 21 //若状态为running或shutdown, 则将线程数恢复到最小值 22 if (runStateLessThan(c, STOP)) { 23 //线程正常完成任务被移除 24 if (!completedAbruptly) { 25 //允许核心线程超时最小值为0, 否则最小值为核心线程数 26 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; 27 //如果任务队列还有任务, 则保证至少有一个线程 28 if (min == 0 && !workQueue.isEmpty()) { 29 min = 1; 30 } 31 //若线程数大于最小值则不新增了 32 if (workerCountOf(c) >= min) { 33 return; 34 } 35 } 36 //新增工作线程 37 addWorker(null, false); 38 } 39 }
工作线程如果从getTask方法中获得null,则会退出while循环并随后执行processWorkerExit方法,该方法会在这个工作线程终止之前执行一些操作,我们看到它会去统计该工作者完成的任务数,然后将其从workers集合中删除,每删除一个工作者之后都会去调用tryTerminate方法尝试终止线程池,但并不一定会真的终止线程池。从tryTerminate方法返回后再次去检查一遍线程池的状态,如果线程池状态为running或者shutdown,并且线程数小于最小值,则恢复一个工作者。这个最小值是怎样计算出来的呢?我们来看看。如果allowCoreThreadTimeOut为true则最小值为0,否则最小值为corePoolSize。但还有一个例外情况,就是虽然允许核心线程超时了,但是如果任务队列不为空的话,那么必须保证有一个线程存在,因此这时最小值设为1。后面就是判断如果工作线程数大于最小值就不新增线程了,否则就新增一个非核心线程。从这个方法可以看到,每个线程退出时都会去判断要不要再恢复一个线程,因此线程池中的线程总数也是动态增减的。
10. 线程池的终止
1 //平缓关闭线程池 2 public void shutdown() { 3 final ReentrantLock mainLock = this.mainLock; 4 mainLock.lock(); 5 try { 6 //检查是否有关闭的权限 7 checkShutdownAccess(); 8 //将线程池状态设为shutdown 9 advanceRunState(SHUTDOWN); 10 //中断闲置的线程 11 interruptIdleWorkers(); 12 //对外提供的钩子 13 onShutdown(); 14 } finally { 15 mainLock.unlock(); 16 } 17 //尝试终止线程池 18 tryTerminate(); 19 } 20 21 //立刻关闭线程池 22 public List<Runnable> shutdownNow() { 23 List<Runnable> tasks; 24 final ReentrantLock mainLock = this.mainLock; 25 mainLock.lock(); 26 try { 27 //检查是否有关闭的权限 28 checkShutdownAccess(); 29 //将线程池状态设为stop 30 advanceRunState(STOP); 31 //中断所有工作线程 32 interruptWorkers(); 33 //排干任务队列 34 tasks = drainQueue(); 35 } finally { 36 mainLock.unlock(); 37 } 38 //尝试终止线程池 39 tryTerminate(); 40 return tasks; 41 }
可以通过两个方法来终止线程池,通过调用shutdown方法可以平缓的终止线程池,通过调用shutdownNow方法可以立即终止线程池。调用shutdown()方法后首先会将线程池状态设置为shutdown,这时线程池会拒绝接收外部传过来的任务,然后调用interruptIdleWorkers()方法中断闲置线程,剩余的线程会继续消费完任务队列里的任务之后才会终止。调用shutdownNow()方法会将线程池状态设置为stop,这是线程池也不再接收外界的任务,并且马上调用interruptWorkers()方法将所有工作线程都中断了,然后排干任务队列里面没有被处理的任务,最后返回未被处理的任务集合。调用shutdown()和shutdownNow()方法后还未真正终止线程池,这两个方法最后都会调用tryTerminate()方法来终止线程池。我们看看该方法的代码。
1 //尝试终止线程池 2 final void tryTerminate() { 3 for (;;) { 4 int c = ctl.get(); 5 //以下两种情况终止线程池,其他情况直接返回: 6 //1.状态为stop 7 //2.状态为shutdown且任务队列为空 8 if (isRunning(c) || runStateAtLeast(c, TIDYING) || 9 (runStateOf(c) == SHUTDOWN && !workQueue.isEmpty())) { 10 return; 11 } 12 //若线程不为空则中断一个闲置线程后直接返回 13 if (workerCountOf(c) != 0) { 14 interruptIdleWorkers(ONLY_ONE); 15 return; 16 } 17 final ReentrantLock mainLock = this.mainLock; 18 mainLock.lock(); 19 try { 20 //将状态设置为tidying 21 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { 22 try { 23 //线程池终止后做的事情 24 terminated(); 25 } finally { 26 //将状态设置为终止状态(TERMINATED) 27 ctl.set(ctlOf(TERMINATED, 0)); 28 //唤醒条件队列所有线程 29 termination.signalAll(); 30 } 31 return; 32 } 33 } finally { 34 mainLock.unlock(); 35 } 36 //若状态更改失败则再重试 37 } 38 }
tryTerminate()方法在其他很多地方也被调用过,比如processWorkerExit()和addWorkerFailed()。调用该方法来尝试终止线程池,在进入for循环后第一个if判断过滤了不符合条件的终止操作,只有状态为stop,或者状态为shutdown且任务队列为空这两种情况才能继续执行。第二个if语句判断工作者数量是否为0,不为0的话也直接返回。经过这两重判断之后才符合终止线程池的条件,于是先通过CAS操作将线程池状态设置为tidying状态,在tidying状态会调用用户自己实现的terminated()方法来做一些处理。到了这一步,不管terminated()方法是否成功执行最后都会将线程池状态设置为terminated,也就标志着线程池真正意义上的终止了。最后会唤醒所有等待线程池终止的线程,让它们继续执行。
11. 常用线程池参数配置
Executors中有许多静态工厂方法来创建线程池,在平时使用中我们都是通过Executors的静态工厂方法来创建线程池的。这其中有几个使用线程池的典型例子我们来看一下。
1 //固定线程数的线程池 2 //注:该线程池将corePoolSize和maximumPoolSize都设置为同一数值,线程池刚创建时线程数为0, 3 //之后每接收一个任务创建一个线程,直到线程数达到nThreads,此后线程数不再增长。如果其中有某个 4 //线程因为发生异常而终止,线程池将补充一个新的线程。 5 public static ExecutorService newFixedThreadPool(int nThreads) { 6 return new ThreadPoolExecutor(nThreads, nThreads, 7 0L, TimeUnit.MILLISECONDS, 8 new LinkedBlockingQueue<Runnable>()); 9 } 10 11 //单个线程的线程池 12 //注:该线程池将corePoolSize和maximumPoolSize都设置为1,因此线程池中永远只有一个线程, 13 //如果该线程因为不可预知的异常而被终止,线程池将会补充一个新的线程。 14 public static ExecutorService newSingleThreadExecutor() { 15 return new FinalizableDelegatedExecutorService 16 (new ThreadPoolExecutor(1, 1, 17 0L, TimeUnit.MILLISECONDS, 18 new LinkedBlockingQueue<Runnable>())); 19 } 20 21 //可缓存的线程池 22 //注:该线程池将corePoolSize设置为0,将maximumPoolSize设置为Integer.MAX_VALUE, 23 //空闲线程存活时间设置为60S。也就是说该线程池一开始线程数为0,随着任务数的增加线程数也相应 24 //增加,线程数的上限为Integer.MAX_VALUE。当任务数减少时线程数也随之减少,最后会减少至0。 25 public static ExecutorService newCachedThreadPool() { 26 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 27 60L, TimeUnit.SECONDS, 28 new SynchronousQueue<Runnable>()); 29 }
推荐阅读
-
并发编程(九)—— Java 并发队列 BlockingQueue 实现之 LinkedBlockingQueue 源码分析
-
Java并发Timer源码分析
-
Java并发编程之Condition源码分析(推荐)
-
Java 集合系列(四)—— ListIterator 源码分析
-
java并发之AtomicInteger源码分析
-
Java并发系列之Semaphore源码分析
-
Java并发系列之CyclicBarrier源码分析
-
Java并发系列之ConcurrentHashMap源码分析
-
Java并发系列之CountDownLatch源码分析
-
java高并发系列 - 第10天:线程安全和synchronized关键字