【JDK源码分析】线程池ThreadPoolExecutor原理解析
前言
一般情况下使用线程池都是通过Executors的工厂方法得到的,这些工厂方法又基本上是调用的ThreadPoolExecutor的构造器。也就是说常用到的线程池基本用到的是ThreadPoolExecutor。ThreadPoolExecutor的大概原理是先规定一个线程池的容量,然后给提交过来的任务创建执行线程,任务执行完毕后放在池子中等待新的任务提交过来,当然ThreadPoolExecutor的内部细节比这要复杂得多。下面就通过源码来理解它的原理。
源码
先从其属性及构造器开始看
ThreadPoolExecutor类属性及构造器
为了便于好理解,先将一些需要注意的地方列出:
1. 原子整型变量ctl是它一个比较重要的属性,它用来存储线程池的运行状态(运行状态、停止状态等)以及当前活动的线程数;它的前3位用来表示线程池的运行状态,后29位用来表示当前活动的线程数,
运行状态名称 | 二进制数值 |
---|---|
RUNNING |
111 0 0000 0000 0000 0000 0000 0000 0000 |
SHUTDOWN |
000 0 0000 0000 0000 0000 0000 0000 0000 |
STOP |
001 0 0000 0000 0000 0000 0000 0000 0000 |
TIDYING |
010 0 0000 0000 0000 0000 0000 0000 0000 |
TERMINATED |
011 0 0000 0000 0000 0000 0000 0000 0000 |
常量CAPACITY用于表示线程池最大线程数量不能超过该值,它的二进制数值为 0001 1111 1111 1111 1111 1111 1111 1111,了解了这些后获取当前线程池运行状态runStateOf、当前活动的线程workerCountOf方法、以及生成ctl的ctlOf方法就很好理解了。
2. 最大线程数maximumPoolSize,该值小于等于常量CAPACITY的;核心线程数corePoolSize,表示一个线程池在没有任务在队列中等待时最大活动的线程数,即当新任务提交时,如果活动的线程小于核心线程数corePoolSize,则会创建新的线程来执行任务即使在池中有空闲的线程,如果活动的线程大于核心线程数corePoolSize且小于最大线程数maximumPoolSize,则仅当任务队列满时才会创建新的线程来执行;如果活动的线程等于maximumPoolSize时,此时会执行拒绝策略来拒绝提交的任务。
3. 线程池的运行状态
运行状态名称 | 状态说明 |
---|---|
RUNNING | 接受新提交的任务并且处理任务队列中的任务 |
SHUTDOWN | 不接受新提交的任务,但是处理任务队列中的任务 |
STOP | 不接受新提交的任任务,不处理任务队列中的任务,同时中断正在运行中的任务 |
TIDYING | 所有任务被终止, 活动的线程数workCount为0,此状态下还会执行terminated钩子方法 |
TERMINATED | terminated钩子方法已执行 |
线程池的状态变化有如下几种:
运行状态变化 | 发生条件 |
---|---|
RUNNING -> SHUTDOWN | 调用shutdown方法 |
(RUNNING or SHUTDOWN) -> STOP | 调用shutdownNow方法 |
SHUTDOWN -> TIDYING | 任务队列和线程池都为空 |
STOP -> TIDYING | 线程池为空 |
TIDYING -> TERMINATED | terminated 钩子方法已执行 |
4. 拒绝策略RejectedExecutionHandler,该策略会在任务被拒绝时执行,JDK提供了4种实现,分别为AbortPolicy(默认的拒绝策略,会在拒绝任务时抛出运行时异常)、CallerRunsPolicy(直接在 execute 方法的调用线程中运行被拒绝的任务,线程池关闭时则直接丢弃任务)、DiscardOldestPolicy(放弃最久未被处理的,然后提交给线程池重试,线程池关闭时则直接丢弃任务)、DiscardPolicy(直接丢弃被拒绝的任务),当然也可以自己按需实现自定义的拒绝策略。
ThreadPoolExecutor属性
// 继承了AbstractExecutorService,AbstractExecutorService定义了基本的任务提交、执行等方法 public class ThreadPoolExecutor extends AbstractExecutorService { // 用于表示线程池的运行状态以及当前活动的线程 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 一个Integer是32bit, 3位用于表示运行状态,其余用于表示活动线程数 private static final int COUNT_BITS = Integer.SIZE - 3; // 线程数最大值 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // 任务队列 private final BlockingQueue<Runnable> workQueue; // 显示锁,用于控制workers的访问等 private final ReentrantLock mainLock = new ReentrantLock(); // 存储所有的工作线程,只有在mainLock下才能访问 private final HashSet<Worker> workers = new HashSet<Worker>(); // 等待中断条件 private final Condition termination = mainLock.newCondition(); // 用于记录线程池中最大的活动线程数 private int largestPoolSize; // 任务完成数量 private long completedTaskCount; // 线程工厂 private volatile ThreadFactory threadFactory; // 拒绝策略 private volatile RejectedExecutionHandler handler; // 空闲线程等待工作时间 private volatile long keepAliveTime; // 为true时使用keepAliveTime作为超时等待,为false所有核心线程(包含空闲的)一直保持存活 private volatile boolean allowCoreThreadTimeOut; // 核心线程数 private volatile int corePoolSize; // 最大线程数 private volatile int maximumPoolSize; // 默认任务拒绝策略,对拒绝的任务抛出异常 private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); // 安全控制访问(主要用于shutdown和 shutdownNow方法) private static final RuntimePermission shutdownPerm = new RuntimePermission("modifyThread"); ... }
ThreadPoolExecutor构造器
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { // 控制参数合法 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); // 任务队列、线程工厂、拒绝策略不能为空 if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; // 使用时间单位unit将keepAliveTime转成纳秒 this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { // 使用默认的线程工厂、默认的拒绝策略 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { // 使用默认的拒绝策略 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { // 使用默认的线程工厂 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); // 运行的线程少于corePoolSize个 if (workerCountOf(c) < corePoolSize) { // 添加任务 if (addWorker(command, true)) return; c = ctl.get(); } // 此时表示线程池处于运行中,并且将任务添加到队列 if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); // 再次检查线程池状态 if (! isRunning(recheck) && remove(command)) // 如果线程池不处于运行状态,将任务从队列中移除 // 拒绝任务 reject(command); else if (workerCountOf(recheck) == 0) // 运行中的任务数为0,有以下2种情况: // 1.线程池处于运行中(加一个空任务是为了保证在队列里等待的任务可以被唤醒后执行) // 2.线程池不处于运行中,remove(command)失败 addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
内部类Worker
Worker类继承了AQS,实现了Runnable接口,Worker是线程池中用于执行任务的线程。
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { // 线程 final Thread thread; // 需要执行的任务 Runnable firstTask; // 当前线程完成的任务数 volatile long completedTasks; // worker构造器 Worker(Runnable firstTask) { // 将AQS的state设置为-1 setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } // 实现runnable了run接口 public void run() { // 执行任务 runWorker(this); } // 是否锁定 protected boolean isHeldExclusively() { return getState() != 0; } // 实现AQS的抽象方法,尝试获取锁,将state状态置为1 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // 实现AQS的抽象方法,尝试释放锁,将state状态置为0 protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } // 获取锁 public void lock() { acquire(1); } // 尝试获取锁 public boolean tryLock() { return tryAcquire(1); } // 释放锁 public void unlock() { release(1); } // 是否锁定 public boolean isLocked() { return isHeldExclusively(); } // 中断已启动线程 void interruptIfStarted() { Thread t; // 线程处于活动状态 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
主要方法
1. execute方法
先从执行任务方法开始,提交任务submit方法中也是调用的该方法进行任务的执行的。此方法会在活动线程数超过核心线程数corePoolSize时将任务放在任务队列中等待活动的线程空闲。
public void execute(Runnable command) { // 防止提交null任务 if (command == null) throw new NullPointerException(); // 获取ctl int c = ctl.get(); // 活动的线程小于核心线程数 if (workerCountOf(c) < corePoolSize) { // 添加新的线程执行任务,第2个参数为true表示活动的线程数不要超过核心线程数corePoolSize if (addWorker(command, true)) return; // 再次获取ctl c = ctl.get(); } // 执行到这里表示活动的线程数大于等于核心线程数 // isRunning先判断线程池是否处于运行状态,然后将任务放入任务队列等待 if (isRunning(c) && workQueue.offer(command)) { // 再次获取ctl int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) // 此时线程池状态不处于运行状态中,将任务从队列中移除 // 调用任务拒绝策略拒绝任务 reject(command); else if (workerCountOf(recheck) == 0) // 执行到线程处于运行状态或者不处于运行状态时任务不在任务队列中,并且线程池的活动线程数为0 // 添加新的线程来执行一个空的任务,用来唤醒等待中的任务被线程执行 addWorker(null, false); } // 执行到这里表示线程池状态不处于运行状态或者任务队列已满 else if (!addWorker(command, false)) reject(command); }
再来看看addWorker方法,此方法是用来根据需要创建新的线程执行任务,
// 参数core,为true时表示活动的线程数不能超过核心线程数corePoolSize,反之则不能超过maximumPoolSize private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { // 获取ctl int c = ctl.get(); // 当前线程池的状态 int rs = runStateOf(c); /* 此处的表达式可以转化成如下所示的表达式 rs >= SHUTDOWN && rs != SHUTDOWN || firstTask != null || workQueue.isEmpty() 此表达式为真的情况有如下几种: 1. 线程池运行状态大于SHUTDOWN(即为STOP、TIDYING、TERMINATED) 2. 线程池运行状态为SHUTDOWN且任务不为null 3. 线程池运行状态为SHUTDOWN且任务为null,任务队列为空 也就是之前提到的当状态为SHUTDOWN时,不再允许添加新的任务,但是会执行已在任务队列中的任务; 当状态为STOP、TIDYING、TERMINATED时表示不会再处理任务 */ if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; /* 执行到此处的情况为以下几种: 1. 线程池状态为RUNNING状态 2. 运行状态为SHUTDOWN, 任务为null,且任务队列不为空 */ // 自旋 for (;;) { // 获取当前活动的线程数 int wc = workerCountOf(c); // 当core为true时,只要当前线程数大于等于核心线程数corePoolSize,就不再往下执行 // 为core为false,只要当前线程数大于等于核心线程数maximumPoolSize,就不再往下执行 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 通过CAS原子操作给ctl增加1 if (compareAndIncrementWorkerCount(c)) //原子操作执行成功后直接跳出外部的循环 break retry; // 原子操作失败后,重新获取ctl c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) // 由于运行状态变化导致之前的CAS原子操作失败,回到外循环判断此时状态是否需要退出 continue retry; // 执行到这里表示是其它线程提交任务导致CAS原子操作失败,通过内部循环再次操作 // else CAS failed due to workerCount change; retry inner loop } } // 执行到这里表示已通过原子操作改变了ctl的值,递增了一次ctl的值 boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { // 构造新的worker对象 w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; // 同步 mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. // 获取当前线程池运行状态 int rs = runStateOf(ctl.get()); /* 下列表达式为真的情况: 1. 线程池状态为RUNNING 2. 线程池状态为SHUTDOWN且提交的任务为null */ if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 当前worker中的线程已被启动,则抛出宜昌 if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); // 将worker放入集合中 workers.add(w); // 获取当前worker的数量 int s = workers.size(); // 如果当前worker的数量超过largestPoolSize,则需要更新largestPoolSize的值 if (s > largestPoolSize) largestPoolSize = s; // 新的worker线程已创建 workerAdded = true; } } finally { // 释放同步 mainLock.unlock(); } // 新的worker线程已创建 if (workerAdded) { // 启动当前worker线程,会执行worker实现的run方法 t.start(); workerStarted = true; } } } finally { if (! workerStarted) // 任务启动失败, 回滚操作 addWorkerFailed(w); } return workerStarted; }
Woker的run方法会去执行runWorker方法,注意runWorker方法会先执行提交给Worker中的任务firstTask,如果firstTask为null则会去任务队列中取任务来执行
,且当任务执行后,会再去队列中取任务来执行。
// worker实现的run方法 public void run() { // 调用线程池的方法runWorker runWorker(this); } // 执行任务 final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; // 将worker中的任务置为null w.firstTask = null; // 释放worker的状态,允许被其它线程打断 w.unlock(); // allow interrupts boolean completedAbruptly = true; try { // 执行任务,直到队列中的任务被取完 while (task != null || (task = getTask()) != null) { // work加锁 w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt // 线程池为RUNNING、SHUTDOWN、STOP // 线程池为RUNNING、SHUTDOWN、STOP且当前线程已中断 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) // 中断线程 wt.interrupt(); try { // 在task执行任务前执行的方法,该方法为空方法,需要时由子类来重写实现逻辑 beforeExecute(wt, task); Throwable thrown = null; try { // 执行task任务 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 在task执行任务后执行的方法,该方法为空方法,需要时由子类来重写实现逻辑 afterExecute(task, thrown); } } finally { // 任务执行完后将task置为null,用来执行任务队列中的任务 task = null; // 该worker线程执行的任务递增一次 w.completedTasks++; // 解锁 w.unlock(); } } // 所有任务执行完 completedAbruptly = false; } finally { // 当任务队列没有任务执行或者线程池被关闭shutdown或shutdownNow时调用 processWorkerExit(w, completedAbruptly); } } // 从队列中获取任务 private Runnable getTask() { boolean timedOut = false; // Did the last poll() time out? for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. // 线程池状态为STOP、TIDYING、TERMINATED或者线程为RUNNING、SHUTDOWN且任务队列为空 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 递减线程数 decrementWorkerCount(); return null; } int wc = workerCountOf(c); // 当线程使用keepAliveTime超时等待获取任务或者活跃线程数已超过核心线程数 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; /*当下列情况出现时: 1. 活跃的线程数大于maximumPoolSize且活跃线程大于1或任务队列为空 2. 从任务队列获取任务超时且活跃线程大于1或者任务队列为空 */ if ((wc > maximumPoolSize || (timed && timedOut)) && (wc > 1 || workQueue.isEmpty())) { // 递减活跃线程数 if (compareAndDecrementWorkerCount(c)) return null; continue; } try { // timed 为真时 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); if (r != null) return r; // 表示r 为null timedOut = true; } catch (InterruptedException retry) { timedOut = false; } } }
再看processWorkerExit方法
// 处理 private void processWorkerExit(Worker w, boolean completedAbruptly) { // 线程被中断 if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted // 递减活动线程的数量 decrementWorkerCount(); final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 计算线程池完成的任务数 completedTaskCount += w.completedTasks; // 从worker集合中移除w workers.remove(w); } finally { mainLock.unlock(); } //尝试终止 tryTerminate(); int c = ctl.get(); // 当线程池状态为TIDYING、TERMINATED if (runStateLessThan(c, STOP)) { if (!completedAbruptly) { // completedAbruptly为false表示任务队列里的任务被执行完 // allowCoreThreadTimeOut为true则min为0,反之为corePoolSize int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // min为0,且任务队列不为空,表示还有任务需要执行 if (min == 0 && ! workQueue.isEmpty()) min = 1; // 若活动的线程大于min if (workerCountOf(c) >= min) return; // replacement not needed } /* 执行到这里表示 1. completedAbruptly为true,表示在任务执行时中断过,保证线程执行任务 2. 活动线程小于min,保证任务队列中的任务被执行或者让核心线程处于空闲等待状态 */ addWorker(null, false); } }
再来看addWorkerFailed,该方法用来将已创建的worker线程回滚到之前的状态
private void addWorkerFailed(Worker w) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { if (w != null) // worker不为null时,将其从workers集合中移除 workers.remove(w); // 递减活动线程数量 decrementWorkerCount(); // 尝试中断 tryTerminate(); } finally { mainLock.unlock(); } } // 通过递减ctl来递减活动线程数量 private void decrementWorkerCount() { do {} while (! compareAndDecrementWorkerCount(ctl.get())); } private boolean compareAndDecrementWorkerCount(int expect) { return ctl.compareAndSet(expect, expect - 1); } // 尝试终止,执行此方法后线程池的状态会先STOP变为TIDYING,再变为TERMINATED final void tryTerminate() { // 自旋 for (;;) { int c = ctl.get(); if (isRunning(c) || runStateAtLeast(c, TIDYING) || (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty())) // 此时的状态为RUNNING、SHUTDOWN、STOP、TIDYING或状态为SHUTDOWN且任务队列不为空 return; if (workerCountOf(c) != 0) { // Eligible to terminate // 活跃的线程大于0时,中断其中一个空闲worker的线程 interruptIdleWorkers(ONLY_ONE); return; } final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // CAS原子操作,将ctl状态置为TIDYING,且worker数量置为0 if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) { try { // 执行terminated,默认是空方法,子类可以重写实现逻辑 terminated(); } finally { // 将ctl状态置为TERMINATED ctl.set(ctlOf(TERMINATED, 0)); termination.signalAll(); } return; } } finally { mainLock.unlock(); } // else retry on failed CAS } }
2. shutdown方法
shutdown方法的作用是不再接收新任务,但是还是会处理任务队列中的任务,并且不会中断已在中执行中的任务线程,执行此方法后线程池的状态会被设置为SHUTDOWN
public void shutdown() { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 检查是否有每个worker的访问权限 checkShutdownAccess(); // 自旋设置线程池的状态为SHUTDOWN advanceRunState(SHUTDOWN); // 中断所有空闲的worker线程 interruptIdleWorkers(); // 空的方法,由子类实现重写实现逻辑,比如ScheduledThreadPoolExecutor就实现了 onShutdown(); // hook for ScheduledThreadPoolExecutor } finally { mainLock.unlock(); } // 尝试终止 tryTerminate(); }
再来看advanceRunState方法,此方法自旋保证设置成目标状态(目前状态为SHUTDOWN或STOP)
// 设置线程池的状态 private void advanceRunState(int targetState) { for (;;) { int c = ctl.get(); // 当线程的状态大于目标状态或者将当前状态设置成目标状态时跳出循环 if (runStateAtLeast(c, targetState) || ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c)))) break; } } interruptIdleWorkers用来中断所有空闲的线程 private void interruptIdleWorkers() { interruptIdleWorkers(false); } // 根据参数来判断是否中断单个进程或者中断所有进程 private void interruptIdleWorkers(boolean onlyOne) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 遍历所有活动线程 for (Worker w : workers) { Thread t = w.thread; // 线程t未中断 if (!t.isInterrupted() && w.tryLock()) { try { // 中断线程t t.interrupt(); } catch (SecurityException ignore) { } finally { w.unlock(); } } // 若为true中断一个线程后直接跳出 if (onlyOne) break; } } finally { mainLock.unlock(); } }
3. shutdownNow方法
shutdownNow方法的作用是不再接收新任务,并且会清除任务队列中的任务,会中断所有任务线程包括已在执行任务的线程,执行此方法后线程池的状态会被设置为STOP
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 检查是否有每个worker的访问权限 checkShutdownAccess(); // 自旋设置线程池的状态为STOP advanceRunState(STOP); // 中断所有线程 interruptWorkers(); // 清除任务队列 tasks = drainQueue(); } finally { mainLock.unlock(); } // 尝试终止 tryTerminate(); return tasks; }
drainQueue方法用来清除任务队列中的任务,返回值为队列中的任务
private List<Runnable> drainQueue() { BlockingQueue<Runnable> q = workQueue; ArrayList<Runnable> taskList = new ArrayList<Runnable>(); // 将任务队列中任务清除,然后将其添加到taskList待返回 q.drainTo(taskList); // 这里再次判断下任务队列中是否已为空,主要防止BlockingQueue的实现为延迟队列 // 此时会将其一个个的遍历清除 if (!q.isEmpty()) { for (Runnable r : q.toArray(new Runnable[0])) { if (q.remove(r)) taskList.add(r); } } return taskList; }
到此源码分析已完毕。
接下来看下Executors怎么利用工厂方法来创建常见的线程池
比如创建一个根据需要创建新线程的线程池,此线程池将corePoolSize设置为0,maximumPoolSize设置为Integer.MAX_VALUE,空闲线程等待时间为60秒,使用的是SynchronousQueue阻塞队列。 就是说每提交一个新任务都是先进入SynchronousQueue队列,此队列不会保存任务,它将任务直接提交给线程,然后如果有空闲线程在60秒内能获取到任务,则用该线程执行这个获取到的任务,没有空闲线程时会创建新的线程来执行任务。空闲线程在60秒后如果没有获取到任务时会被从线程池中清除掉。最大活动线程的数量设置的是Integer.MAX_VALUE,而实际是小于CAPACITY(即$ 2^{29}-1 $)。
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
创建一个固定线程数量的的线程池,此线程池是设置corePoolSize、maximumPoolSize一样为nThreads个,使用的*队列LinkedBlockingQueue(容量为Integer.MAX_VALUE)。 就是说每提交一个新任务都是创建新的线程,直到线程数量超过corePoolSize,然后提交的任务会进入*队列中等待,在等待的任务会在有空闲线程时才会被执行。线程一旦创建就不会被清除,会一直存在于线程池中以复用来执行任务,直到调用shutdown方法或者由于执行任务时出错中断。
public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
总结
通过以上分析,需要根据实际需要创建合适的线程池,如果对线程池的原理比较了解,最好直接使用线程池的构造方法来构造线程池,那样你才能更为准确的控制核心线程数,最大线程数,线程空闲多少时间被清除,以及合适的拒绝策略,让线程池更可控。
上一篇: 与牛有关
下一篇: 社群分享:涨粉的35个玩法和技巧
推荐阅读
-
并发编程(十三)—— Java 线程池 实现原理与源码深度解析 之 Executors(三)
-
Java并发之线程池ThreadPoolExecutor源码分析学习
-
【JDK源码分析】线程池ThreadPoolExecutor原理解析
-
【Java并发编程】21、线程池ThreadPoolExecutor源码解析
-
Java并发编程:Java线程池核心ThreadPoolExecutor的使用和原理分析
-
ThreadPoolExecutor 线程池源码分析-基于jdk8
-
源码分析:线程池ThreadPoolExecutor与Executors
-
【JDK源码分析】线程池ThreadPoolExecutor原理解析
-
ThreadPoolExecutor 线程池源码分析-基于jdk8
-
并发编程(十三)—— Java 线程池 实现原理与源码深度解析 之 Executors(三)