Java线程池原理解读
引言
引用自《阿里巴巴java开发手册》
【强制】线程资源必须通过线程池提供,不允许在应用中自行显式创建线程。
说明:使用线程池的好处是减少在创建和销毁线程上所消耗的时间以及系统资源的开销,解决资源不足的问题。如果不使用线程池,有可能造成系统创建大量同类线程而导致消耗完内存或者“过度切换”的问题。
之前在阅读《阿里巴巴java开发手册》时发现,其中有一条对于线程资源的值用限制,要求使用线程池来创建和维护,那么什么是线程池呢,为什么是线程池?原理是什么?怎么使用它?有什么讲究呢?带着这一系列的问题,我们开始来探究一下,希望这篇文章对我们有所收获。
本文源码来自jdk 1.8 。
简介
线程池,故名思意,就是一个存放线程的池子,学术一点的说法,就是一组存放线程资源的集合。为什么有线程池这一概念地产生呢?想想以前我们都是需要线程的时候,直接自己手动来创建一个,然后执行完任务我们就不管了,线程就是我们执行异步任务的一个工具或者说载体,我们并没有太多关注于这个线程自身生命周期对于系统或环境的影响,而只把重心放在了多线程任务执行完成的结果输出,然后目的达到了,但是真正忽略了线程资源的维护和监控等问题。随着大型系统大量多线程资源的使用,对多线程疏于重视、维护和管理而对资源占用和拉低性能的影响逐渐扩大,才引起了人们的思考。多线程的创建和销毁在多线程的生命周期中占有很大比重,这一部分其实很占用资源和性能,如果使用线程来执行简单任务,而因为线程本身的维护成本已经超出任务执行的效益,这是得不偿失的,于是就产生了线程池。通过使用线程池,将线程的生命周期管控起来,同时能够方便地获取到线程、复用线程,避免频繁地创建和销毁线程带来额外性能开销,这大概就是线程池引入的背景和初衷吧。
所以现在看来,合理的利用线程池能够给系统带来几大好处:
1、减低资源消耗。通过重复利用已创建好的线程来降低线程创建和销毁造成的消耗;
2、提高响应速度。当任务到达时,任务可以不需要等待线程创建就能立马执行;
3、提高线程可管理性。线程池时稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
但是,如果想合理的掌控线程池的使用,那么多线程池的原理和特性,都是必须要了解清楚的。
解读
api继承结构图
jdk为我们提供一个叫做excutor的框架来使用线程池,它是线程池的基础,我们可以看下线程池相关的java diagram:
我们从上面的图一个一个来看,分析下各个接口和类的功能和所承担的角色。
executor接口:
它是线程池的基础,提供了唯一的一个方法execute用来执行线程任务。
executorservice接口:
它继承了executor接口,提供了线程池生命周期管理的几乎所有方法,包括诸如shutdown、awaittermination、submit、invokeall、invokeany等。
abstractexecutorservice类:
一个提供了线程池生命周期默认实现的抽象类,并且自行新增了如newtaskfor、doinvokeany等方法。
threadpoolexecutor类:
这是线程池的核心类,也是我们常用来创建和管理线程池的类,我们使用executors调用newfixedthreadpool、newsinglethreadexecutor和newcachedthreadpool等方法创建的线程池,都是threadpoolexecutor类型的。
scheduledexecutorservice接口:
赋予了线程池具备延迟和定期执行任务的能力,它提供了一些方法接口,使得任务能够按照给定的方式来延期或者周期性的执行任务。
scheduledthreadpoolexecutor类:
继承自threadpoolexecutor类,同时实现了scheduledexecutorservice接口,具备了线程池所有通用能力,同时增加了延时执行和周期性执行任务的能力。
除了上面说到的这些,jdk1.7中还新增了一个线程池forkjoinpool,它与threadpoolexecutor一样继承于abstractexecutorservice。与其他类型的executorservice相比,它的不同之处在于采用了工作窃取算法(work-stealing,可以从源码和注释中得到更多详细介绍):所有线程池中的线程会尝试找到并执行已被提交到池中的或由其他线程创建的任务。这样的算法使得很少有线程处于空闲状态,非常的高效,这样的方式常用于如大多数由任务产生大量子任务的情况,以及像从外部客户端大量提交小任务到池中的情况。
以上是对于线程池api继承体系的简单梳理和介绍,接下来我们深入源码去进行分析。
线程池的几种内部状态
线程池使用了一个integer类型变量来记录线程池任务数量和线程池状态信息,很巧妙。
(展开以查看代码)
1 /** 2 * the main pool control state, ctl, is an atomic integer packing 3 * two conceptual fields 4 * workercount, indicating the effective number of threads 5 * runstate, indicating whether running, shutting down etc 6 * 7 * in order to pack them into one int, we limit workercount to 8 * (2^29)-1 (about 500 million) threads rather than (2^31)-1 (2 9 * billion) otherwise representable. if this is ever an issue in 10 * the future, the variable can be changed to be an atomiclong, 11 * and the shift/mask constants below adjusted. but until the need 12 * arises, this code is a bit faster and simpler using an int. 13 * 14 * the workercount is the number of workers that have been 15 * permitted to start and not permitted to stop. the value may be 16 * transiently different from the actual number of live threads, 17 * for example when a threadfactory fails to create a thread when 18 * asked, and when exiting threads are still performing 19 * bookkeeping before terminating. the user-visible pool size is 20 * reported as the current size of the workers set. 21 * 22 * the runstate provides the main lifecycle control, taking on values: 23 * 24 * running: accept new tasks and process queued tasks 25 * shutdown: don't accept new tasks, but process queued tasks 26 * stop: don't accept new tasks, don't process queued tasks, 27 * and interrupt in-progress tasks 28 * tidying: all tasks have terminated, workercount is zero, 29 * the thread transitioning to state tidying 30 * will run the terminated() hook method 31 * terminated: terminated() has completed 32 * 33 * the numerical order among these values matters, to allow 34 * ordered comparisons. the runstate monotonically increases over 35 * time, but need not hit each state. the transitions are: 36 * 37 * running -> shutdown 38 * on invocation of shutdown(), perhaps implicitly in finalize() 39 * (running or shutdown) -> stop 40 * on invocation of shutdownnow() 41 * shutdown -> tidying 42 * when both queue and pool are empty 43 * stop -> tidying 44 * when pool is empty 45 * tidying -> terminated 46 * when the terminated() hook method has completed 47 * 48 * threads waiting in awaittermination() will return when the 49 * state reaches terminated. 50 * 51 * detecting the transition from shutdown to tidying is less 52 * straightforward than you'd like because the queue may become 53 * empty after non-empty and vice versa during shutdown state, but 54 * we can only terminate if, after seeing that it is empty, we see 55 * that workercount is 0 (which sometimes entails a recheck -- see 56 * below). 57 */ 58 private final atomicinteger ctl = new atomicinteger(ctlof(running, 0)); 59 private static final int count_bits = integer.size - 3; 60 private static final int capacity = (1 << count_bits) - 1; 61 62 // runstate is stored in the high-order bits 63 private static final int running = -1 << count_bits; 64 private static final int shutdown = 0 << count_bits; 65 private static final int stop = 1 << count_bits; 66 private static final int tidying = 2 << count_bits; 67 private static final int terminated = 3 << count_bits; 68 69 // packing and unpacking ctl 70 private static int runstateof(int c) { return c & ~capacity; } 71 private static int workercountof(int c) { return c & capacity; } 72 private static int ctlof(int rs, int wc) { return rs | wc; }
看这个变量ctl,被定义为了atomicinteger,使用高3位来表示线程池状态,低29位来表示线程池中的任务数量。
线程池状态
running:线程池能够接受新任务,以及对新添加的任务进行处理。
shutdown:线程池不可以接受新任务,但是可以对已添加的任务进行处理。
stop:线程池不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。
tidying:当所有的任务已终止,ctl记录的"任务数量"为0,线程池会变为tidying状态。当线程池变为tidying状态时,会执行钩子函数terminated()。terminated()在threadpoolexecutor类中是空的,若用户想在线程池变为tidying时,进行相应的处理;可以通过重载terminated()函数来实现。
terminated:线程池彻底终止的状态。
根据代码设计,我们用图标来展示一下
各线程池状态的切换图示
原理分析
原理分析,我们将结合源码的和注释的方式来分析。
核心参数
通过上面的描述我们知道,线程池的核心实现即threadpoolexecutor类就是本次我们重点关注和学习的对象。从对它的初始化过程我们看到,它完整的构造方法向我们暴露了这几个核心的参数:
1 public threadpoolexecutor(int corepoolsize, 2 int maximumpoolsize, 3 long keepalivetime, 4 timeunit unit, 5 blockingqueue<runnable> workqueue, 6 threadfactory threadfactory, 7 rejectedexecutionhandler handler) { 8 if (corepoolsize < 0 || 9 maximumpoolsize <= 0 || 10 maximumpoolsize < corepoolsize || 11 keepalivetime < 0) 12 throw new illegalargumentexception(); 13 if (workqueue == null || threadfactory == null || handler == null) 14 throw new nullpointerexception(); 15 this.corepoolsize = corepoolsize; 16 this.maximumpoolsize = maximumpoolsize; 17 this.workqueue = workqueue; 18 this.keepalivetime = unit.tonanos(keepalivetime); 19 this.threadfactory = threadfactory; 20 this.handler = handler; 21 }
通过上面构造函数可以看出,threadpoolexecutor类具有7个参数,由于篇幅受限我没有把构造函数的注释文档贴上来,我现在逐个翻译并简要说明一下:
corepoolsize:核心线程数,当线程数小于该值时,线程池会优先创建新线程来执行任务,如果调用了线程池的prestartallcorethreads方法,线程池会提前创建并启动所有基本线程,除非设置了allowcorethreadtimeout,否则核心线程将持续保留在线程池中即时没有新的任务提交过来。
maximumpoolsize:最大线程数,即线程池所能允许创建的最大线程数量。
keepalivetime:空闲线程存活时间,当线程数量大于核心线程数时,这是多余空闲线程在终止之前等待新任务的最长时间。
unit:keepalivetime数值的时间单位。
workqueue:任务队列,用于缓存未执行的任务,队列一直会持有任务直到有线程开始执行它。
threadfactory:线程工厂,可以通过工厂创建更具识别性质的线程,如线程名字等。
handler:拒绝策略,当线程和队列都处于饱和时就使用拒绝策略来处理新任务。
线程池中线程的使用和创建规则
在java线程池的实现逻辑中,线程池所能创建的线程数量受限于corepoolsize和maximumpoolsize两个参数值,线程的创建时机则和corepoolsize和workqueue两个参数相关,当线程数量和队列长度都已达到饱和时,则介入拒绝策略来处理新的任务了,下面把大概的流程说明一下。
1、当来了新任务,如果线程池中空闲线程数量小于corepoolsize,则直接拿线程池中新的线程来处理任务;
2、如果线程池正在运行的线程数量大于等于corepoolsize,而此时workqueue队列未满,则将此任务缓存到队列中;
3、如果线程池正在运行的线程数量大于等于corepoolsize,且workqueue队列已满,但现在的线程数量还小于maximumpoolsize,则创建新的线程来执行任务。
4、如果线程数量已经大于maximumpoolsize且workqueue队列也已经满了,则使用拒绝策略来处理该任务,默认的拒绝策略就是抛出异常(abortpolicy)。
我们简化下上面的文字,用简单表格来展示:
我们再用流程图来对线程池中提交任务的这一逻辑增加感性认识:
下面,我们通过代码,来重点性的解读一下这一流程:
1、线程的创建和复用
线程池中线程的创建是通过线程工厂threadfactory来实现的,线程池的默认实现是使用executors.defaultthreadfactory()来返回的工厂类,我们可以通过构造函数指定线程工厂,这里不做深入了解。
顺便提一句,线程池的创建其实还有个关键方法prestartallcorethreads(),它的作用就是在线程池刚初始化的时候就激活核心线程数大小的线程放置到线程池中,等待任务着任务来执行,但是jdk默认的启动策略中并没有使用它,我按照这个方法查询了一下,在tomcat包中实现的threadpoolexecutor中在构造的时候,都调用了这个方法来初始化核心线程数量。
线程复用是线程池作用的关键所在,避免线程重复创建和销毁,重复使用空闲的未销毁的线程。所以这就要求一个线程在执行完一个任务之后不能直接退出,需要重新去队列任务中获取新的任务来执行,如果任务队列中没有任务,且keepalivetime没有被设置,那么这个工作线程将一直阻塞下去指导有新的任务可执行,这样就达到了线程复用的目的。
(展开以查看代码)
1 private final class worker 2 extends abstractqueuedsynchronizer 3 implements runnable 4 { 5 // 创建任务调用内部类worker 6 worker(runnable firsttask) { 7 setstate(-1); // inhibit interrupts until runworker 8 this.firsttask = firsttask; 9 // 通过线程工厂来创建线程 10 this.thread = getthreadfactory().newthread(this); 11 } 12 13 // 实现了runnable接口 14 public void run() { 15 runworker(this); 16 } 17 } 18 19 // 执行worker任务 20 final void runworker(worker w) { 21 thread wt = thread.currentthread(); 22 runnable task = w.firsttask; 23 w.firsttask = null; 24 w.unlock(); // allow interrupts 25 boolean completedabruptly = true; 26 try { 27 // 循环从队列中获取任务 28 while (task != null || (task = gettask()) != null) { 29 w.lock(); 30 // if pool is stopping, ensure thread is interrupted; 31 // if not, ensure thread is not interrupted. this 32 // requires a recheck in second case to deal with 33 // shutdownnow race while clearing interrupt 34 if ((runstateatleast(ctl.get(), stop) || 35 (thread.interrupted() && 36 runstateatleast(ctl.get(), stop))) && 37 !wt.isinterrupted()) 38 wt.interrupt(); 39 try { 40 beforeexecute(wt, task); 41 throwable thrown = null; 42 try { 43 // 执行线程任务 44 task.run(); 45 } catch (runtimeexception x) { 46 thrown = x; throw x; 47 } catch (error x) { 48 thrown = x; throw x; 49 } catch (throwable x) { 50 thrown = x; throw new error(x); 51 } finally { 52 afterexecute(task, thrown); 53 } 54 } finally { 55 task = null; 56 w.completedtasks++; 57 w.unlock(); 58 } 59 } 60 completedabruptly = false; 61 } finally { 62 processworkerexit(w, completedabruptly); 63 } 64 }
2、提交线程任务
提交任务调用线程池的submit方法,该方法在abstractexecutorservice中。
1 public future<?> submit(runnable task) { 2 if (task == null) throw new nullpointerexception(); 3 // 创建任务 4 runnablefuture<void> ftask = newtaskfor(task, null); 5 // 执行任务 6 execute(ftask); 7 return ftask; 8 }
其中的execute接口在executor接口中定义,具体的实现在threadpoolexecutor中得以体现。
(展开以查看代码)
1 public void execute(runnable command) { 2 if (command == null) 3 throw new nullpointerexception(); 4 /* 5 * proceed in 3 steps: 6 * 7 * 1. if fewer than corepoolsize threads are running, try to 8 * start a new thread with the given command as its first 9 * task. the call to addworker atomically checks runstate and 10 * workercount, and so prevents false alarms that would add 11 * threads when it shouldn't, by returning false. 12 * 13 * 2. if a task can be successfully queued, then we still need 14 * to double-check whether we should have added a thread 15 * (because existing ones died since last checking) or that 16 * the pool shut down since entry into this method. so we 17 * recheck state and if necessary roll back the enqueuing if 18 * stopped, or start a new thread if there are none. 19 * 20 * 3. if we cannot queue task, then we try to add a new 21 * thread. if it fails, we know we are shut down or saturated 22 * and so reject the task. 23 */ 24 // ctl记录了线程数量和线程状态 25 int c = ctl.get(); 26 // 如果工作线程数小于核心线程数,创建新线程执行 27 // 即时其他线程是空闲的 28 if (workercountof(c) < corepoolsize) { 29 if (addworker(command, true)) 30 return; 31 c = ctl.get(); 32 } 33 // 缓存任务到队列中,这里进行了double check 34 // 如果线程池中运行的线程数量>=corepoolsize, 35 // 且线程池处于running状态,且把提交的任务成功放入阻塞队列中, 36 // 就再次检查线程池的状态。 37 // 1.如果线程池不是running状态,且成功从阻塞队列中删除任务, 38 // 则该任务由当前 rejectedexecutionhandler 处理。 39 // 2.否则如果线程池中运行的线程数量为0,则通过 40 // addworker(null, false)尝试新建一个线程, 41 // 新建线程对应的任务为null。 42 if (isrunning(c) && workqueue.offer(command)) { 43 int recheck = ctl.get(); 44 // 任务无效则拒绝 45 if (! isrunning(recheck) && remove(command)) 46 reject(command); 47 else if (workercountof(recheck) == 0) 48 addworker(null, false); 49 } 50 // 添加新的工作线程,并在addworker方法中的两个for循环来判断 51 // 如果以上两个条件不成立,既没能将任务成功放入阻塞队列中, 52 // 且addwoker新建线程失败,则该任务由当前 53 // rejectedexecutionhandler 处理。 54 else if (!addworker(command, false)) 55 // 采用拒绝策略 56 reject(command); 57 }
addworker方法用于新增任务,第二个boolean参数表示线程数是否控制在核心线程数之内。
(展开以查看代码)
1 private boolean addworker(runnable firsttask, boolean core) { 2 retry: 3 for (;;) { 4 int c = ctl.get(); 5 int rs = runstateof(c); 6 7 // check if queue empty only if necessary. 8 if (rs >= shutdown && 9 ! (rs == shutdown && 10 firsttask == null && 11 ! workqueue.isempty())) 12 return false; 13 14 for (;;) { 15 int wc = workercountof(c); 16 if (wc >= capacity || 17 wc >= (core ? corepoolsize : maximumpoolsize)) 18 return false; 19 if (compareandincrementworkercount(c)) 20 break retry; 21 c = ctl.get(); // re-read ctl 22 if (runstateof(c) != rs) 23 continue retry; 24 // else cas failed due to workercount change; retry inner loop 25 } 26 } 27 28 boolean workerstarted = false; 29 boolean workeradded = false; 30 worker w = null; 31 try { 32 // 创建工作线程 33 w = new worker(firsttask); 34 final thread t = w.thread; 35 if (t != null) { 36 final reentrantlock mainlock = this.mainlock; 37 mainlock.lock(); 38 try { 39 // recheck while holding lock. 40 // back out on threadfactory failure or if 41 // shut down before lock acquired. 42 int rs = runstateof(ctl.get()); 43 44 if (rs < shutdown || 45 (rs == shutdown && firsttask == null)) { 46 if (t.isalive()) // precheck that t is startable 47 throw new illegalthreadstateexception(); 48 // 将线程放置于hashset中,持有mainlock才可访问 49 // workers中记录了池中真正任务线程数量 50 workers.add(w); 51 int s = workers.size(); 52 if (s > largestpoolsize) 53 largestpoolsize = s; 54 workeradded = true; 55 } 56 } finally { 57 mainlock.unlock(); 58 } 59 if (workeradded) { 60 t.start(); 61 workerstarted = true; 62 } 63 } 64 } finally { 65 if (! workerstarted) 66 addworkerfailed(w); 67 } 68 return workerstarted; 69 }
3、关闭线程池
threadpoolexecutor提供了shutdown()和shutdownnow()两个方法来关闭线程。
shutdown:
将线程状态设置为shutdown,同时中断线程,按过去执行已提交任务的顺序,发起一个有序的关闭命令,而且不再接收新的任务,最后尝试将线程池状态设置为terminated。
shutdownnow:
将线程状态设置为stop,中断所有的任务且不再接收新任务,尝试停止所有正在执行的任务、暂停等待处理的任务,并返回等待执行的任务列表。中断线程使用thread.interrupt方法,未响应中断命令的任务是无法被中断的。
jdk提供的常用的线程池
一般情况下我们都不直接用threadpoolexecutor类来创建线程池,而是通过executors工具类去构建,通过executors工具类我们可以构造5种不同的线程池。
newfixedthreadpool(int nthreads):
创建固定线程数的线程池,corepoolsize和maximumpoolsize是相等的,默认情况下,线程池中的空闲线程不会被回收的;
newcachedthreadpool:
创建线程数量不定的线程池,线程数量随任务量变动,一旦来了新的任务,如果线程池中没有空闲线程则立马创建新的线程来执行任务,空闲线程存活时间60秒,过后就被回收了,可见这个线程池弹性很高;
newsinglethreadexecutor:
创建线程数量为1的线程池,等价于newfixedthreadpool(1)所构造的线程池;
newscheduledthreadpool(int corepoolsize):
创建核心线程数为corepoolsize,可执行定时任务的线程池;
newsinglethreadscheduledexecutor:
等价于newscheduledthreadpool(1)。
阻塞队列
构造函数中的队列允许我们自定义,队列的意义在于缓存无法得到线程执行的任务,当线程数量大于corepoolsize而当前workqueue还没有满时,就需要将任务放置到队列中。jdk提供了几种类型的队列容器,每种类型都具各自特点,可以根据实际场景和需要自行配置到线程池中。
arrayblockingqueue:
有界队列,基于数组结构,按照队列fifo原则对元素排序;
linkedblockingqueue:
*队列,基于链表结构,按照队列fifo原则对元素排序,executors.newfixedthreadpool()使用了这个队列;
synchronousqueue:
同步队列,该队列不存储元素,每个插入操作必须等待另一个线程调用移除操作,否则插入操作会一直被阻塞,executors.newcachedthreadpool()使用了这个队列;
priorityblockingqueue:
优先级队列,具有优先级的无限阻塞队列。
拒绝策略
拒绝策略(rejectedexecutionhandler)也称饱和策略,当线程数量和队列都达到饱和时,就采用饱和策略来处理新提交过来的任务,默认情况下采用的策略是抛出异常(abortpolicy),表示无法处理直接抛出异常,其实jdk提供了四种策略,也很好记,拒绝策略无非就是抛异常、执行或者丢弃任务,其中丢弃任务就分为丢弃自己或者丢弃队列中最老的任务,下面简要说明一下:
abortpolicy:丢弃新任务,并抛出 rejectedexecutionexception
discardpolicy:不做任何操作,直接丢弃新任务
discardoldestpolicy:丢弃队列队首(最老)的元素,并执行新任务
callerrunspolicy:由当前调用线程来执行新任务
使用技巧
使用了线程池技术未必能够给工作带来利好,在没能正确理解线程池特性以及了解自身业务场景下而配置的线程池,可能会成为系统性能或者业务的瓶颈甚至是漏洞,所以在我们使用线程池时,除了对线程池本身特性了如指掌,还需要对自身业务属性进行一番分析,以便配置出合理的高效的线程池以供项目使用,下面我们从这几个方面来分析:
- 任务的性质:cpu密集型任务,io密集型任务和混合型任务。
- 任务的优先级:高,中和低。
- 任务的执行时间:长,中和短。
- 任务的依赖性:是否依赖其他系统资源,如数据库连接。
性质不同的任务可以用不同规模的线程池分开处理。cpu密集型任务配置尽可能少的线程数量,如配置ncpu+1个线程的线程池,以减少线程切换带来的性能开销。io密集型任务则由于需要等待io操作,线程并不是一直在执行任务,则配置尽可能多的线程,如2*ncpu。混合型的任务,如果可以拆分,则将其拆分成一个cpu密集型任务和一个io密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐率要高于串行执行的吞吐率,如果这两个任务执行时间相差太大,则没必要进行分解。我们可以通过runtime.getruntime().availableprocessors()方法获得当前设备的cpu个数。
优先级不同的任务可以使用优先级队列priorityblockingqueue来处理。它可以让优先级高的任务先得到执行,需要注意的是如果一直有优先级高的任务提交到队列里,那么优先级低的任务可能永远不能执行。
当然,以上这些配置方式都是经验值,实际当中还需要分析自己的项目场景经过多次测试方可得出最适合自己项目的线程池配置。
自问自答
面试的时候,可能会遇到面试官问下面的这样几个问题,看看你现在能不能回答,算是一种学习的自我检查吧。
1、线程池原理,参数如何设置?
2、线程池有哪些参数,阻塞队列用的是什么队列,为什么?
3、线程池原理,为什么要创建线程池,创建线程池的方式?
4、创建线程池有哪几个核心参数,如何合理配置线程池大小?
参考资料
1、https://juejin.im/post/5aedb6b651882522835e5e45
2、https://cloud.tencent.com/developer/article/1109643
3、http://ifeve.com/java-threadpool/
上一篇: 前端基础之CSS
下一篇: Vue+Django项目部署