java 中ThreadPoolExecutor原理分析
java 中threadpoolexecutor原理分析
线程池简介
java线程池是开发中常用的工具,当我们有异步、并行的任务要处理时,经常会用到线程池,或者在实现一个服务器时,也需要使用线程池来接收连接处理请求。
线程池使用
jdk中提供的线程池实现位于java.util.concurrent.threadpoolexecutor。在使用时,通常使用executorservice接口,它提供了submit,invokeall,shutdown等通用的方法。
在线程池配置方面,executors类中提供了一些静态方法能够提供一些常用场景的线程池,如newfixedthreadpool,newcachedthreadpool,newsinglethreadexecutor等,这些方法最终都是调用到了threadpoolexecutor的构造函数。
threadpoolexecutor的包含所有参数的构造函数是
/** * @param corepoolsize the number of threads to keep in the pool, even * if they are idle, unless {@code allowcorethreadtimeout} is set * @param maximumpoolsize the maximum number of threads to allow in the * pool * @param keepalivetime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepalivetime} argument * @param workqueue the queue to use for holding tasks before they are * executed. this queue will hold only the {@code runnable} * tasks submitted by the {@code execute} method. * @param threadfactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached public threadpoolexecutor(int corepoolsize, int maximumpoolsize, long keepalivetime, timeunit unit, blockingqueue<runnable> workqueue, threadfactory threadfactory, rejectedexecutionhandler handler) { if (corepoolsize < 0 || maximumpoolsize <= 0 || maximumpoolsize < corepoolsize || keepalivetime < 0) throw new illegalargumentexception(); if (workqueue == null || threadfactory == null || handler == null) throw new nullpointerexception(); this.corepoolsize = corepoolsize; this.maximumpoolsize = maximumpoolsize; this.workqueue = workqueue; this.keepalivetime = unit.tonanos(keepalivetime); this.threadfactory = threadfactory; this.handler = handler; }
- corepoolsize设置线程池的核心线程数,当添加新任务时,如果线程池中的线程数小于corepoolsize,则不管当前是否有线程闲置,都会创建一个新的线程来执行任务。
- maximunpoolsize是线程池中允许的最大的线程数
- workqueue用于存放排队的任务
- keepalivetime是大于corepoolsize的线程闲置的超时时间
- handler用于在任务逸出、线程池关闭时的任务处理 ,线程池的线程增长策略为,当前线程数小于corepoolsize时,新增线程,当线程数=corepoolsize且corepoolsize时,只有在workqueue不能存放新的任务时创建新线程,超出的线程在闲置keepalivetime后销毁。
实现(基于jdk1.8)
threadpoolexecutor中保存的状态有
当前线程池状态, 包括running,shutdown,stop,tidying,terminated。
当前有效的运行线程的数量。
将这两个状态放到一个int变量中,前三位作为线程池状态,后29位作为线程数量。
例如0b11100000000000000000000000000001, 表示running, 一个线程。
通过hashset来存储工作者集合,访问该hashset前必须先获取保护状态的mainlock:reentrantlock
submit、execute
execute的执行方式为,首先检查当前worker数量,如果小于corepoolsize,则尝试add一个core worker。线程池在维护线程数量以及状态检查上做了大量检测。
public void execute(runnable command) { int c = ctl.get(); // 如果当期数量小于corepoolsize if (workercountof(c) < corepoolsize) { // 尝试增加worker if (addworker(command, true)) return; c = ctl.get(); } // 如果线程池正在运行并且成功添加到工作队列中 if (isrunning(c) && workqueue.offer(command)) { // 再次检查状态,如果已经关闭则执行拒绝处理 int recheck = ctl.get(); if (! isrunning(recheck) && remove(command)) reject(command); // 如果工作线程都down了 else if (workercountof(recheck) == 0) addworker(null, false); } else if (!addworker(command, false)) reject(command); }
addworker方法实现
private boolean addworker(runnable firsttask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runstateof(c); // check if queue empty only if necessary. if (rs >= shutdown && ! (rs == shutdown && firsttask == null && ! workqueue.isempty())) return false; for (;;) { int wc = workercountof(c); if (wc >= capacity || wc >= (core ? corepoolsize : maximumpoolsize)) return false; if (compareandincrementworkercount(c)) break retry; c = ctl.get(); // re-read ctl if (runstateof(c) != rs) continue retry; // else cas failed due to workercount change; retry inner loop } } boolean workerstarted = false; boolean workeradded = false; worker w = null; try { w = new worker(firsttask); final thread t = w.thread; if (t != null) { final reentrantlock mainlock = this.mainlock; mainlock.lock(); try { // recheck while holding lock. // back out on threadfactory failure or if // shut down before lock acquired. int rs = runstateof(ctl.get()); if (rs < shutdown || (rs == shutdown && firsttask == null)) { if (t.isalive()) // precheck that t is startable throw new illegalthreadstateexception(); workers.add(w); int s = workers.size(); if (s > largestpoolsize) largestpoolsize = s; workeradded = true; } } finally { mainlock.unlock(); } if (workeradded) { // 如果添加成功,则启动该线程,执行worker的run方法,worker的run方法执行外部的runworker(worker) t.start(); workerstarted = true; } } } finally { if (! workerstarted) addworkerfailed(w); } return workerstarted; }
worker类继承了abstractqueuedsynchronizer获得了同步等待这样的功能。
private final class worker extends abstractqueuedsynchronizer implements runnable { /** * this class will never be serialized, but we provide a * serialversionuid to suppress a javac warning. */ private static final long serialversionuid = 6138294804551838833l; /** thread this worker is running in. null if factory fails. */ final thread thread; /** initial task to run. possibly null. */ runnable firsttask; /** per-thread task counter */ volatile long completedtasks; /** * creates with given first task and thread from threadfactory. * @param firsttask the first task (null if none) */ worker(runnable firsttask) { setstate(-1); // inhibit interrupts until runworker this.firsttask = firsttask; this.thread = getthreadfactory().newthread(this); } /** delegates main run loop to outer runworker */ public void run() { runworker(this); } // lock methods // // the value 0 represents the unlocked state. // the value 1 represents the locked state. protected boolean isheldexclusively() { return getstate() != 0; } protected boolean tryacquire(int unused) { if (compareandsetstate(0, 1)) { setexclusiveownerthread(thread.currentthread()); return true; } return false; } protected boolean tryrelease(int unused) { setexclusiveownerthread(null); setstate(0); return true; } public void lock() { acquire(1); } public boolean trylock() { return tryacquire(1); } public void unlock() { release(1); } public boolean islocked() { return isheldexclusively(); } void interruptifstarted() { thread t; if (getstate() >= 0 && (t = thread) != null && !t.isinterrupted()) { try { t.interrupt(); } catch (securityexception ignore) { } } }
runworker(worker)是worker的轮询执行逻辑,不断地从工作队列中获取任务并执行它们。worker每次执行任务前需要进行lock,防止在执行任务时被interrupt。
final void runworker(worker w) { thread wt = thread.currentthread(); runnable task = w.firsttask; w.firsttask = null; w.unlock(); // allow interrupts boolean completedabruptly = true; try { while (task != null || (task = gettask()) != null) { w.lock(); // if pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. this // requires a recheck in second case to deal with // shutdownnow race while clearing interrupt if ((runstateatleast(ctl.get(), stop) || (thread.interrupted() && runstateatleast(ctl.get(), stop))) && !wt.isinterrupted()) wt.interrupt(); try { beforeexecute(wt, task); throwable thrown = null; try { task.run(); } catch (runtimeexception x) { thrown = x; throw x; } catch (error x) { thrown = x; throw x; } catch (throwable x) { thrown = x; throw new error(x); } finally { afterexecute(task, thrown); } } finally { task = null; w.completedtasks++; w.unlock(); } } completedabruptly = false; } finally { processworkerexit(w, completedabruptly); } }
threadpoolexecutor的submit方法中将callable包装成futuretask后交给execute方法。
futuretask
futuretask继承于runnable和future,futuretask定义的几个状态为
new, 尚未执行
completing, 正在执行
normal, 正常执行完成得到结果
exceptional, 执行抛出异常
cancelled, 执行被取消
interrupting,执行正在被中断
interrupted, 已经中断。
其中关键的get方法
public v get() throws interruptedexception, executionexception { int s = state; if (s <= completing) s = awaitdone(false, 0l); return report(s); }
先获取当前状态,如果还未执行完成并且正常,则进入等待结果流程。在awaitdone不断循环获取当前状态,如果没有结果,则将自己通过cas的方式添加到等待链表的头部,如果设置了超时,则locksupport.parknanos到指定的时间。
static final class waitnode { volatile thread thread; volatile waitnode next; waitnode() { thread = thread.currentthread(); } } private int awaitdone(boolean timed, long nanos) throws interruptedexception { final long deadline = timed ? system.nanotime() + nanos : 0l; waitnode q = null; boolean queued = false; for (;;) { if (thread.interrupted()) { removewaiter(q); throw new interruptedexception(); } int s = state; if (s > completing) { if (q != null) q.thread = null; return s; } else if (s == completing) // cannot time out yet thread.yield(); else if (q == null) q = new waitnode(); else if (!queued) queued = unsafe.compareandswapobject(this, waitersoffset, q.next = waiters, q); else if (timed) { nanos = deadline - system.nanotime(); if (nanos <= 0l) { removewaiter(q); return state; } locksupport.parknanos(this, nanos); } else locksupport.park(this); } }
futuretask的run方法是执行任务并设置结果的位置,首先判断当前状态是否为new并且将当前线程设置为执行线程,然后调用callable的call获取结果后设置结果修改futuretask状态。
public void run() { if (state != new || !unsafe.compareandswapobject(this, runneroffset, null, thread.currentthread())) return; try { callable<v> c = callable; if (c != null && state == new) { v result; boolean ran; try { result = c.call(); ran = true; } catch (throwable ex) { result = null; ran = false; setexception(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= interrupting) handlepossiblecancellationinterrupt(s); } }
感谢阅读,希望能帮助到大家,谢谢大家对本站的支持!
下一篇: SpringBoot任务调度器的实现代码