我们都知道线程池的用法,一般就是先new一个threadpoolexecutor对象,再调用execute(runnable runnable)传入我们的runnable,剩下的交给线程池处理就行了,于是这次我就从threadpoolexecutor的execute方法看起:
public void execute(runnable command) { if (command == null) throw new nullpointerexception(); /* * proceed in 3 steps: * * 1. if fewer than corepoolsize threads are running, try to * start a new thread with the given command as its first * task. the call to addworker atomically checks runstate and * workercount, and so prevents false alarms that would add * threads when it shouldn't, by returning false. * * 2. if a task can be successfully queued, then we still need * to double-check whether we should have added a thread * (because existing ones died since last checking) or that * the pool shut down since entry into this method. so we * recheck state and if necessary roll back the enqueuing if * stopped, or start a new thread if there are none. * * 3. if we cannot queue task, then we try to add a new * thread. if it fails, we know we are shut down or saturated * and so reject the task. */ int c = ctl.get(); //1.如果workercountof(c)即正在运行的线程数小于核心线程数,就执行addwork if (workercountof(c) < corepoolsize) { if (addworker(command, true)) return; c = ctl.get(); } //2.如果线程池还在运行状态并且把任务添加到任务队列成功 if (isrunning(c) && workqueue.offer(command)) { int recheck = ctl.get(); //3.如果线程池不在运行状态并且从任务队列移除任务成功,执行线程池饱和策略(默认直接抛出异常) if (! isrunning(recheck) && remove(command)) reject(command); //4.否则如果此时运行线程数==0,就直接调用addwork方法 else if (workercountof(recheck) == 0) addworker(null, false); } //5.如果2条件不成立,继续判断如果addwork返回false,执行线程池饱和策略 else if (!addworker(command, false)) reject(command); }
/** * @param firsttask the task the new thread should run first (or * null if none). workers are created with an initial first task * (in method execute()) to bypass queuing when there are fewer * than corepoolsize threads (in which case we always start one), * or when the queue is full (in which case we must bypass queue). * initially idle threads are usually created via * prestartcorethread or to replace other dying workers. * * @param core if true use corepoolsize as bound, else * maximumpoolsize. (a boolean indicator is used here rather than a * value to ensure reads of fresh values after checking other pool * state). * @return true if successful */ private boolean addworker(runnable firsttask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runstateof(c); // check if queue empty only if necessary. if (rs >= shutdown && ! (rs == shutdown && firsttask == null && ! workqueue.isempty())) return false; for (;;) { int wc = workercountof(c); //1.如果正在运行的线程数大于corepoolsize 或 maximumpoolsize(core代表以核心线程数还是最大线程数为边界),return false,表示addworker失败 if (wc >= capacity || wc >= (core ? corepoolsize : maximumpoolsize)) return false; //2.否则将运行线程数+1,并跳出这个for循环 if (compareandincrementworkercount(c)) break retry; c = ctl.get(); // re-read ctl if (runstateof(c) != rs) continue retry; // else cas failed due to workercount change; retry inner loop } } boolean workerstarted = false; boolean workeradded = false; worker w = null; try { //3.创建一个worker对象,传入我们的runnable w = new worker(firsttask); final thread t = w.thread; if (t != null) { final reentrantlock mainlock = this.mainlock; mainlock.lock(); try { // recheck while holding lock. // back out on threadfactory failure or if // shut down before lock acquired. int rs = runstateof(ctl.get()); if (rs < shutdown || (rs == shutdown && firsttask == null)) { if (t.isalive()) // precheck that t is startable throw new illegalthreadstateexception(); workers.add(w); int s = workers.size(); if (s > largestpoolsize) largestpoolsize = s; workeradded = true; } } finally { mainlock.unlock(); } if (workeradded) { //4.开始启动线程 t.start(); workerstarted = true; } } } finally { if (! workerstarted) addworkerfailed(w); } return workerstarted; }
worker(runnable firsttask) { setstate(-1); // inhibit interrupts until runworker this.firsttask = firsttask; this.thread = getthreadfactory().newthread(this); } /** delegates main run loop to outer runworker. */ public void run() { runworker(this); } final void runworker(worker w) { thread wt = thread.currentthread(); runnable task = w.firsttask; w.firsttask = null; w.unlock(); // allow interrupts boolean completedabruptly = true; try { //1.当firsttask不为空或gettask不为空时一直循环 while (task != null || (task = gettask()) != null) { w.lock(); // if pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. this // requires a recheck in second case to deal with // shutdownnow race while clearing interrupt if ((runstateatleast(ctl.get(), stop) || (thread.interrupted() && runstateatleast(ctl.get(), stop))) && !wt.isinterrupted()) wt.interrupt(); try { beforeexecute(wt, task); throwable thrown = null; try { //2.执行任务; } catch (runtimeexception x) { thrown = x; throw x; } catch (error x) { thrown = x; throw x; } catch (throwable x) { thrown = x; throw new error(x); } finally { afterexecute(task, thrown); } } finally { task = null; w.completedtasks++; w.unlock(); } } completedabruptly = false; } finally { processworkerexit(w, completedabruptly); } }
private runnable gettask() { boolean timedout = false; // did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runstateof(c); // check if queue empty only if necessary. if (rs >= shutdown && (rs >= stop || workqueue.isempty())) { decrementworkercount(); return null; } int wc = workercountof(c); // are workers subject to culling? //1.会不会淘汰空闲线程 boolean timed = allowcorethreadtimeout || wc > corepoolsize; //2.return null意味着回收一个worker即淘汰一个线程 if ((wc > maximumpoolsize || (timed && timedout)) && (wc > 1 || workqueue.isempty())) { if (compareanddecrementworkercount(c)) return null; continue; } try { //3.等待指定时间 runnable r = timed ? workqueue.poll(keepalivetime, timeunit.nanoseconds) : workqueue.take(); if (r != null) return r; timedout = true; } catch (interruptedexception retry) { timedout = false; } } }
可以看1、2注释,allowcorethreadtimeout代表存活一定时间是否对核心线程有效(默认为false),先看它为ture的情况,此时不管是核心线程还是非核心线程在3处都会等待一定时间(就是我们传入的线程保活时间),等待时间内如果从任务队列取到任务,则返回执行,否则timeout为true,继续走到2,由于(timed && timedout)和workqueue.isempty()均为true,返回null,代表回收一个线程;如果allowcorethreadtimeout为false,代表不回收核心线程,此时如果在3处没有取到任务,继续执行到2处,只有当wc > corepoolsize或wc > maximumpoolsize时才会执行return null,否则一直循环,相当于该线程一直处于运行状态,直到从任务队列拿到新的任务