Android开发中线程池源码解析
线程池(英语:thread pool):一种线程使用模式。线程过多会带来调度开销,进而影响缓存局部性和整体性能。而线程池维护着多个线程,等待着监督管理者分配可并发执行的任务。这避免了在处理短时间任务时创建与销毁线程的代价。线程池不仅能够保证内核的充分利用,还能防止过分调度。可用线程数量应该取决于可用的并发处理器、处理器内核、内存、网络sockets等的数量。 例如,线程数一般取cpu数量+2比较合适,线程数过多会导致额外的线程切换开销。----摘自*
我们在android或者java开发中,日常所使用的就是threadpoolexecutor了,我们先来看下如何使用一个线程池来代替多线程开发。
使用线程池
// 创建一个核心线程数为5,最大线程数为10,空闲线程存活时间为60s的线程池对象 val threadpoolexecutor = threadpoolexecutor( 5, 10, 60, timeunit.minutes, arrayblockingqueue<runnable>(100), rejectedexecutionhandler { _, _ -> println("reject submit thread to thread pool") } ) // 测试 for (i in 1..10) { threadpoolexecutor.execute { println("execute thread is:${thread.currentthread().name}") } } // 结果 // execute thread is:pool-1-thread-1 // execute thread is:pool-1-thread-1 // execute thread is:pool-1-thread-1 // execute thread is:pool-1-thread-1 // execute thread is:pool-1-thread-5 // execute thread is:pool-1-thread-5 // execute thread is:pool-1-thread-4 // execute thread is:pool-1-thread-3 // execute thread is:pool-1-thread-2 // execute thread is:pool-1-thread-1
从结果就可以看出来,执行时间操作,但是只创建了5个线程,另外5次都是复用线程的。这样就达到了复用存在的线程、减少对象的创建和销毁的额外开销;并且可以控制最大线程数,也就是控制了最大并发数。
知道如何使用一个线程池还不够,我们需要看看threadpoolexecutor是如何创建、复用这些线程的。下面我们看看创建threadpoolexecutor对象的几个参数:
构造方法
/** * 创建一个threadpoolexecutor对象 * * @param corepoolsize 核心线程数,这些线程会一直在线程池中,除非设置了 allowcorethreadtimeout * @param maximumpoolsize 最大线程数,运行线程创建的最大值 * @param keepalivetime 当线程数>核心线程数的时候,这个值就是空闲且非核心线程存活的时间 * @param unit keepalivetime的单位 * @param workqueue 保存task的队列,直到执行execute()方法执行 * @param threadfactory threadfactory是一个接口,里面只有thread newthread(runnable r)方法,用来创建线程, * 默认采用executors.defaultthreadfactory() * @param handler 拒绝处理任务时的策略,如果线程池满了且所有线程都不处于空闲状态, * 通过rejectedexecutionhandler接口的rejectedexecution(runnable r, threadpoolexecutor executor)来处理传进来的runnable * 系统提供了四种:callerrunspolicy(), abortpolicy(), discardpolicy(), discardoldestpolicy() * 默认采用new abortpolicy() */ public threadpoolexecutor(int corepoolsize, int maximumpoolsize, long keepalivetime, timeunit unit, blockingqueue<runnable> workqueue, threadfactory threadfactory, rejectedexecutionhandler handler){ if (corepoolsize < 0 || maximumpoolsize <= 0 || maximumpoolsize < corepoolsize || keepalivetime < 0) throw new illegalargumentexception(); if (workqueue == null || threadfactory == null || handler == null) throw new nullpointerexception(); this.acc = system.getsecuritymanager() == null ? null : accesscontroller.getcontext(); this.corepoolsize = corepoolsize; this.maximumpoolsize = maximumpoolsize; this.workqueue = workqueue; this.keepalivetime = unit.tonanos(keepalivetime); this.threadfactory = threadfactory; this.handler = handler; }
我在方法头注释中我都一一解释了几个参数的作用,还有几点需要注意的就是:
- 核心线程数不能小于0;
- 最大线程数不能小于0;
- 最大线程数不能小于核心线程数;
- 空闲线程的存活时间不能小于0;
通过上面的解释我们很明白的知道前面几个参数的作用,但是最后两个参数我们并不能通过表面的解释通晓它,既然不能通过表象看懂他俩,那就看看默认的实现是如何做的,这样在接下来的源码分析中很有帮助。
threadfactory:线程工厂
threadfactory 是一个接口,里面只由唯一的 thread newthread(runnable r); 方法,此方法是用来创建线程的,从接口中我们得到的就只有这么多,下面我们看看 executors 默认的 defaultthreadfactory 类:
// 静态内部类 static class defaultthreadfactory implements threadfactory { // 线程池的标识,从1开始没创建一个线程池+1 private static final atomicinteger poolnumber = new atomicinteger(1); // 线程组 private final threadgroup group; // 线程名中的结尾标识,从1开始每创建一个线程+1 private final atomicinteger threadnumber = new atomicinteger(1); // 线程名 private final string nameprefix; defaultthreadfactory() { securitymanager s = system.getsecuritymanager(); group = (s != null) ? s.getthreadgroup() : thread.currentthread().getthreadgroup(); nameprefix = "pool-" + poolnumber.getandincrement() + "-thread-"; } public thread newthread(runnable r) { thread t = new thread(group, r, nameprefix + threadnumber.getandincrement(), 0); if (t.isdaemon()) t.setdaemon(false); if (t.getpriority() != thread.norm_priority) t.setpriority(thread.norm_priority); return t; } }
rejectedexecutionhandler:拒绝处理任务的策略
rejectedexecutionhandler 也是一个接口,并且也只提供了唯一的 void rejectedexecution(runnable r, threadpoolexecutor executor); 方法。我们可以自定义策略,也可以用上面提到的封装好的四种策略,先看一下四种策略分别怎么拒绝任务的:
callerrunspolicy
public static class callerrunspolicy implements rejectedexecutionhandler { /** * creates a {@code callerrunspolicy}. */ public callerrunspolicy() { } /** * 如果线程池还没关闭,那么就再次执行这个runnable */ public void rejectedexecution(runnable r, threadpoolexecutor e) { if (!e.isshutdown()) { r.run(); } } }
abortpolicy
public static class abortpolicy implements rejectedexecutionhandler { /** * creates an {@code abortpolicy}. */ public abortpolicy() { } /** * 这个策略就是抛出异常,不做其他处理 */ public void rejectedexecution(runnable r, threadpoolexecutor e) { throw new rejectedexecutionexception("task " + r.tostring() + " rejected from " + e.tostring()); } }
discardpolicy
public static class discardpolicy implements rejectedexecutionhandler { /** * creates a {@code discardpolicy}. */ public discardpolicy() { } /** * 什么也不做,也就是抛弃了这个runnable */ public void rejectedexecution(runnable r, threadpoolexecutor e) { } }
discardoldestpolicy
public static class discardoldestpolicy implements rejectedexecutionhandler { /** * creates a {@code discardoldestpolicy} for the given executor. */ public discardoldestpolicy() { } /** * 1. 线程池未关闭 * 2. 获取队列中的下一个runnable * 3. 获取到了,但是不对它进行处理,也就是抛弃它 * 4. 执行我们传过来的这个runnable */ public void rejectedexecution(runnable r, threadpoolexecutor e) { if (!e.isshutdown()) { e.getqueue().poll(); e.execute(r); } } }
重要的参数
除了上述构造方法中的几个参数外,线程池还有几个比较核心的参数,如下:
public class threadpoolexecutor extends abstractexecutorservice { // ctl 的低29位表示线程池中的线程数,高3位表示当前线程状态 private final atomicinteger ctl = new atomicinteger(ctlof(running, 0)); // 29 private static final int count_bits = integer.size - 3; // (2^29) -1 private static final int capacity = (1 << count_bits) - 1; // 运行状态:接受新任务并处理排队的任务 private static final int running = -1 << count_bits; // 关闭状态:不接受新任务,但处理排队的任务 private static final int shutdown = 0 << count_bits; // 停止状态:不接受新任务,不处理排队的任务,中断正在进行的任务 private static final int stop = 1 << count_bits; // 整理状态:整理状态,所有任务已终止,workercount为零,线程将运行terminate()方法 private static final int tidying = 2 << count_bits; // 终止状态:terminate()方法执行完成 private static final int terminated = 3 << count_bits; // 表示线程是否允许或停止 private static int runstateof(int c) { return c & ~capacity; } // 线程的有效数量 private static int workercountof(int c) { return c & capacity; } private static int ctlof(int rs, int wc) { return rs | wc; } ......后面的源码暂时省略 }
execute:执行
public void execute(runnable command) { if (command == null) throw new nullpointerexception(); int c = ctl.get(); // 如果运行中的线程数小于核心线程数,执行addworker(command, true)创建新的核心thread执行任务 if (workercountof(c) < corepoolsize) { if (addworker(command, true)) return; c = ctl.get(); } // 1. 已经满足:运行中的线程数大于核心线程数,但是小于最大线程数 // 2. 需要满足:线程池在运行状态 // 3. 需要满足:添加到工作队列中成功 if (isrunning(c) && workqueue.offer(command)) { int recheck = ctl.get(); // 如果线程不在运行状态,就从工作队列中移除command // 并且执行拒绝策略 if (!isrunning(recheck) && remove(command)) reject(command); // 线程池处于运行状态,但是没有线程,则addworker(null, false) // 至于这里为什么要传入一个null,因为在最外层的if条件中我们已经将runnable添加到工作队列中了 // 而且在runworker()源码中也可以得到答案,如果传入的runnable为空,就会去工作队列中取task。 else if (workercountof(recheck) == 0) addworker(null, false); } // 执行addworker()创建新的非核心线程thread执行任务 // addworker() 失败,执行拒绝策略 else if (!addworker(command, false)) reject(command); }
从上面源码中可以看出,execute()一个新的任务,主要有以下这几种情况:
1、核心线程未满,直接新建核心线程并执行任务;
2、核心线程满了,工作队列未满,将任务添加到工作队列中;
3、核心线程和工作队列都满,但是最大线程数未达到,新建线程并执行任务;
4、上面条件都不满足,那么就执行拒绝策略。
更形象的可以看下方流程图:
添加任务的流程图
addworker(runnable , boolean):添加worker
private boolean addworker(runnable firsttask, boolean core) { // 标记外循环,比如在内循环中break retry就直接跳出外循环 retry: for (; ; ) { int c = ctl.get(); int rs = runstateof(c); // 直接返回false有以下3种情况: // 1. 线程池状态为stop、tidying、terminated // 2. 线程池状态不是running状态,并且firsttask不为空 // 3. 线程池状态不是running状态,并且工作队列为空 if (rs >= shutdown && !(rs == shutdown && firsttask == null && !workqueue.isempty())) return false; for (; ; ) { int wc = workercountof(c); // 如果添加的是核心线程,但是运行的线程数大于等于核心线程数,那么就不添加了,直接返回 // 如果添加的是非核心线程,但是运行的线程数大于等于最大线程数,那么也不添加,直接返回 if (wc >= capacity || wc >= (core ? corepoolsize : maximumpoolsize)) return false; // 增加workercount的值 +1 if (compareandincrementworkercount(c)) // 跳出外循环 break retry; c = ctl.get(); // 重新检查线程池状态 if (runstateof(c) != rs) continue retry; // 重新检查的状态和之前不合,再次从外循环进入 } } boolean workerstarted = false; boolean workeradded = false; worker w = null; try { w = new worker(firsttask); final thread t = w.thread; if (t != null) { // 线程池重入锁 final reentrantlock mainlock = this.mainlock; // 获得锁 mainlock.lock(); try { // recheck while holding lock. // back out on threadfactory failure or if // shut down before lock acquired. int rs = runstateof(ctl.get()); // 线程池在运行状态或者是线程池关闭同时runnable也为空 if (rs < shutdown || (rs == shutdown && firsttask == null)) { if (t.isalive()) // precheck that t is startable throw new illegalthreadstateexception(); // 想worker中添加新的worker workers.add(w); int s = workers.size(); if (s > largestpoolsize) largestpoolsize = s; workeradded = true; } } finally { // 释放锁 mainlock.unlock(); } // 如果添加成功,启动线程 if (workeradded) { t.start(); workerstarted = true; } } } finally { if (!workerstarted) addworkerfailed(w); } return workerstarted; }
addworker() 主要就是在满足种种条件(上述源码中解释了)后,新建一个worker对象,并添加到hashset<worker> workers中去,最后调用新建worker对象的thread变量的start()方法。
worker类
worker是一个继承了aqs并实现了runnable的内部类,我们重点看看它的run()方法,因为上面addworker()中,t.start()触发的就是它的run()方法:
private final class worker extends abstractqueuedsynchronizer implements runnable { /** * this class will never be serialized, but we provide a * serialversionuid to suppress a javac warning. */ private static final long serialversionuid = 6138294804551838833l; /** * thread this worker is running in. null if factory fails. */ final thread thread; /** * initial task to run. possibly null. */ runnable firsttask; /** * per-thread task counter */ volatile long completedtasks; /** * creates with given first task and thread from threadfactory. * * @param firsttask the first task (null if none) */ worker(runnable firsttask) { setstate(-1); // inhibit interrupts until runworker this.firsttask = firsttask; // 这边是把runnable传给了thread,也就是说thread.run()就是执行了下面的run()方法 this.thread = getthreadfactory().newthread(this); } /** * delegates main run loop to outer runworker */ public void run() { runworker(this); } }
run()方法实际调用了runworker(worker)方法
runworker(worker)方法:
final void runworker(worker w) { thread wt = thread.currentthread(); runnable task = w.firsttask; w.firsttask = null; w.unlock(); // 释放锁,允许中断 boolean completedabruptly = true; try { // 1. worker中的task不为空 // 2. 如果worker的task为空,那么取workerqueue的task while (task != null || (task = gettask()) != null) { w.lock(); // if pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. this // requires a recheck in second case to deal with // shutdownnow race while clearing interrupt if ((runstateatleast(ctl.get(), stop) || (thread.interrupted() && runstateatleast(ctl.get(), stop))) && !wt.isinterrupted()) wt.interrupt(); try { // 这是一个空方法,可由子类实现 beforeexecute(wt, task); throwable thrown = null; try { // 执行task task.run(); } .... 省略 // 这是一个空方法,可由子类实现 finally { afterexecute(task, thrown); } } finally { task = null; w.completedtasks++; w.unlock(); } } completedabruptly = false; } finally { processworkerexit(w, completedabruptly); } }
gettask():
```java private runnable gettask() { // 进入死循环 for (; ; ) { try { // 为true的条件: // allowcorethreadtimeout=true: 核心线程需根据keepalivetime超时等待 // 核心线程数已满 boolean timed = allowcorethreadtimeout || wc > corepoolsize; // 如果timed为true,执行blockqueue.poll(),这个操作在取不到task的时候会等待keepalivetime,然后返回null // 如果timed为false,执行blockqueue.take(),这个操作在队列为空的时候一直阻塞 runnable r = timed ? workqueue.poll(keepalivetime, timeunit.nanoseconds) : workqueue.take(); if (r != null) return r; } } } ```
线程池的源码按照上述的几个方法(execute(runnable) -> addworker(runnable,core) -> worker -> runworker(worker) -> gettask())的顺序来分析,你就可以很清晰的将运作过程了解清楚,同事构造方法和几个重要的参数一定要懂,不然对于后面的源码分析很受阻碍,相信大家通过这篇文章可以加深对线程池的理解。
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。
下一篇: 5G-RNTI