Java JUC包源码分析 - 线程池ThreadPoolExecutor
程序员文章站
2024-03-02 15:33:40
...
线程池的相关类结构
线程池就是存储了已创建指定个数的线程的集合,当需要用线程执行任务的时候,就可以从线程池中拿一个空闲的线程来执行任务。先看下源码,源码分析完了再看线程池的周边应用,以及几个思考。
第一部分,先看下线程池的结构,然后是如何创建线程池。
第二部分,线程池的状态
第三部分,提交任务到线程池的方法
第四部分,关闭线程池
第五部分,拒绝策略
第一部分:
public class ThreadPoolExecutor extends AbstractExecutorService {
// 成员变量
// 任务队列
private final BlockingQueue<Runnable> workQueue;
// 可重入锁
private final ReentrantLock mainLock = new ReentrantLock();
// 工作线程集合
private final HashSet<Worker> workers = new HashSet<Worker>();
// 阻塞条件
private final Condition termination = mainLock.newCondition();
// 记录着最大能达到的的线程池大小
private int largestPoolSize;
// 已经执行完的任务个数
private long completedTaskCount;
// 线程工厂
private volatile ThreadFactory threadFactory;
// 当线程池饱和或者关闭时的拒绝策略
private volatile RejectedExecutionHandler handler;
// 空闲线程在空闲时存活的时间
private volatile long keepAliveTime;
// 是否允许核心线程超时,默认false。false是当空闲时也保持活着状态,true是核心线程使用
// keepAliveTime时间来控制等待任务的超时时间
private volatile boolean allowCoreThreadTimeOut;
// 核心线程池的大小
private volatile int corePoolSize;
// 最大的线程池大小
// private volatile int maximumPoolSize;
// 默认的拒绝策略是中断,抛异常
private static final RejectedExecutionHandler defaultHandler = new AbortPolicy();
// 构造函数
/**
*
*corePoolSize:核心线程池大小,也就是说有这么多线程一直活着,直到关闭线程池
*maximumPoolSize:最大的线程数,也就是说除了常驻的线程外,当阻塞队列满的时候,还可以新建
*maximumPoolSize-corePoolSize个线程来处理任务
*keepAliveTime:就是那些可扩展出来的空闲线程当空闲时的存活时间
*unit:是上面空闲时间的单位
*workqueue:工作队列,当提交任务后,任务已经占满核心线程来,就会被添加到这个阻塞队列里,等待
*被线程执行
*threadfactory:创建线程的工厂,可以自己实现,也可以利用另一个构造函数使用默认的工厂
*handler:拒绝策略,当阻塞队列满时,这时候新添加任务进来后,对这个任务的拒绝策略,有4种
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
// 这个构造函数使用了默认的线程工厂和默认的拒绝策略,其他两个构造函数是只默认一个参数
// 默认的线程工厂就是创建的线程是非守护线程,优先级为NORM_PRIORITY,详情看后面介绍
// 默认的拒绝策略就是当阻塞队列满了,来了新任务,则抛出异常
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
// 其他两个不看了...
}
Executors.defaultThreadFactory():
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
ThreadPoolExecutor的内部类:
static class DefaultThreadFactory implements ThreadFactory {
private static final AtomicInteger poolNumber = new AtomicInteger(1);
private final ThreadGroup group;
private final AtomicInteger threadNumber = new AtomicInteger(1);
private final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
// 新建线程时,设置线程为非守护线程,优先级为NORM_PRIORITY
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
// 默认的拒绝策略:
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
/**
* Always throws RejectedExecutionException.
*
* @param r the runnable task requested to be executed
* @param e the executor attempting to execute this task
* @throws RejectedExecutionException always
*/
// 直接抛异常
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
// 再看下用Executors创建线程池的方法(不建议用这种方式创建线程池),这个类的设计风格,突然想起
// effective Java里面的静态工厂方法创建对象:
public class Executors {
// 创建固定大小的线程池,空闲线程被立即回收,任务阻塞队列是*队列,
// 最大可以是Integer.MAX_VALUE(问题在于这,可以一直提交任务进去,占满内存)
// 默认的线程工厂和拒绝策略
// 还有个重载方法,可以传线程工厂进去
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
// 创建只有一个线程的线程池,问题与上面的fix一样
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
// 创建一个可以无限增加线程数的线程池(问题也很大,想想线程数可以一直创建下去)
// 空闲线程超过60秒就回收
// 任务队列是一个栈的形式(非公平),不缓存任务,任务提交后直接创建线程允许
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
// 创建一个可延迟执行的线程池,核心线程池为1,线程池最大可达Integer.MAX_VALUE
// 可延迟的工作队列
// 创建委托类管理已创建的可延迟的线程池
// 延迟相关的内容,下文还会讲到
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
}
// 线程池的子类
public class ScheduledThreadPoolExecutor
extends ThreadPoolExecutor
implements ScheduledExecutorService {
// 延迟执行的线程池,最大线程数固定Integer.MAX_VALUE,任务队列固定为DelayedWorkQueue
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
// 提交延迟任务,不需要返回结果.提交后delay时间后才会执行
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
// 装饰任务,返回第二个参数:任务
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
// 提交延迟任务,需要返回结果。提交后delay时间后才会执行
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
/**
* @throws RejectedExecutionException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
* @throws IllegalArgumentException {@inheritDoc}
*/
// 提交后delay时间后才会执行,然后每间隔period时间执行一次,直到线程池关闭
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
// 延时执行任务
private void delayedExecute(RunnableScheduledFuture<?> task) {
// 如果线程池关闭了,就拒绝
if (isShutdown())
reject(task);
else {
// 获取线程池的任务队列,并把任务添加进去
super.getQueue().add(task);
// 再次判断线程池是否关闭了,关闭了并且当前运行状态不能运行,就把任务从队列移除
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task))
task.cancel(false);
else
// 新建线程,准备启动任务
ensurePrestart();
}
}
void ensurePrestart() {
// 如果工作线程数比核心线程数少就新建线程放到核心线程池中
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
// 如果工作线程数为0,则扩展一个任务为null的线程出来(纯粹新建一个线程),因为最大线程数
// 是Integer.MAX_VALUE,所以直接建新线程
else if (wc == 0)
addWorker(null, false);
}
}
第二部分,线程池的状态:
// ctl记录里两个信息:一个是线程池的状态(高3位),一个是线程池的线程数量(低29位)
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
// 高3位值是111,此状态能够接受新任务,并且对已添加的任务处理
private static final int RUNNING = -1 << COUNT_BITS;
// 高3位值是000,此状态不能接受新任务,但能处理已添加的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
// 高3位值是001,此状态不能接受新任务,不能处理已添加的任务,并且会中断正在执行的任务
private static final int STOP = 1 << COUNT_BITS;
// 高3位值是010,当所有任务终止时,线程池会变成TIDYING状态,当线程池变为TIDYING状态时,会执
// 行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程
// 池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。
private static final int TIDYING = 2 << COUNT_BITS;
// 高3位值是011,线程池彻底终止就变成TERMINNATED状态
private static final int TERMINATED = 3 << COUNT_BITS;
// 附上英文的状态解释:
* RUNNING: Accept new tasks and process queued tasks
* SHUTDOWN: Don't accept new tasks, but process queued tasks
* STOP: Don't accept new tasks, don't process queued tasks,
* and interrupt in-progress tasks
* TIDYING: All tasks have terminated, workerCount is zero,
* the thread transitioning to state TIDYING
* will run the terminated() hook method
* TERMINATED: terminated() has completed
// Packing and unpacking ctl
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }
看一下线程池状态变化的过程:
第三部分,提交任务到线程池:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/*
* Proceed in 3 steps:
*
* 1. If fewer than corePoolSize threads are running, try to
* start a new thread with the given command as its first
* task. The call to addWorker atomically checks runState and
* workerCount, and so prevents false alarms that would add
* threads when it shouldn't, by returning false.
*
* 2. If a task can be successfully queued, then we still need
* to double-check whether we should have added a thread
* (because existing ones died since last checking) or that
* the pool shut down since entry into this method. So we
* recheck state and if necessary roll back the enqueuing if
* stopped, or start a new thread if there are none.
*
* 3. If we cannot queue task, then we try to add a new
* thread. If it fails, we know we are shut down or saturated
* and so reject the task.
*/
int c = ctl.get();
// 计算工作线程的数量,如果工作线程数小于核心线程数,则新建一个线程来执行这个任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如果工作线程数达到了核心线程数,先判断线程池状态是否在运行,然后把任务放入任务阻塞队列
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 重复检查线程池状态,如果不是Running,就从任务队列移除上面添加的任务,并执行拒绝
if (! isRunning(recheck) && remove(command))
reject(command);
// 如果工作线程数为0,则尝试新建一个任务为null的线程
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
// 通过addWorker(command, false)新建一个线程,并将任务(command)添加到该线程中;
// 然后,启动该线程从而执行任务。
// 如果addWorker(command, false)执行失败,则通过reject()执行相应的拒绝策略的内容。
else if (!addWorker(command, false))
reject(command);
}
// 添加任务到线程上执行
private boolean addWorker(Runnable firstTask, boolean core) {
// 这段主要是判断线程池的状态,确定能够添加任何到线程那执行
retry:
for (;;) {
// 获取线程池的状态和数量的int值
int c = ctl.get();
// 获取线程池的状态
int rs = runStateOf(c);
// Check if queue empty only if necessary.
// 有效性检查:线程池关闭了,任务为null,工作队列为空
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 死循环+cas保证线程安全
for (;;) {
// 获取工作线程个数
int wc = workerCountOf(c);
// 如果工作线程数达到最大容量或者是核心线程数大小就返回false
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
// 工作线程数+1,失败就跳出循环重试
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
// 如果和之前的线程池状态不一致了,就继续从retry重新开始
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
// 新建一个工作线程,与任务绑定,等待run
w = new Worker(firstTask);
final Thread t = w.thread;
// 判断线程
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
// 加锁
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
// 获取锁之后,再重新检查线程状态
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
// 如果发现新建的线程处于运行状态了,就抛出异常,因为还没有启动线程
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
// 把新建的线程放入线程池的集合里
workers.add(w);
// 记录放入线程池集合数量的最大值
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
// 如果添加成功,就启动线程
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
// 如果线程没有start起来,就把那个线程从集合移除,线程数量ctl - 1
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
// 工作线程类
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
/**
* This class will never be serialized, but we provide a
* serialVersionUID to suppress a javac warning.
*/
private static final long serialVersionUID = 6138294804551838833L;
/** Thread this worker is running in. Null if factory fails. */
// 工厂失败时为null
final Thread thread;
/** Initial task to run. Possibly null. */
// 需要运行的任务,可以为null
Runnable firstTask;
/** Per-thread task counter */
// 每个线程完成的任务数
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
// 新建Worker
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
}
// submit是AbstractExecutorService的方法
// submit方法也是调用execut方法
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
// 有返回值的submit
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
// 传入Callable,也有返回值
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
第四部分,关闭线程池:
// 关闭连接池,不会接受新的任务,但是会把已经提交的任务执行完毕
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 安全检查
checkShutdownAccess();
// 把线程池状态置SHUTDOWN
advanceRunState(SHUTDOWN);
// 中断空闲线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
tryTerminate();
}
// 中断空闲线程
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 遍历线程集合,如果不处于中断状态就中断
for (Worker w : workers) {
Thread t = w.thread;
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
// 尝试把状态最后变成Terminated
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 判断中断的准入条件
// 正在运行的不行,处于TIDYING不行,处于SHUTDOWN 但任务队列不为空的不行
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 如果工作线程不为0,就中断空闲线程
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
// 当线程终止了就调用这个方法
terminated();
} finally {
// 最后把ctl置0
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
// 马上关闭线程池,把状态变STOP,终止正在执行的线程,返回等待执行的任务集合
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
advanceRunState(STOP);
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
tryTerminate();
return tasks;
}
// 把任务队列里面的任务通过drainTo()放到一个list里面,如果是延时的队列,则可能失败,需要
// 一个一个的删除
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
第五部分,拒绝策略:
线程池产生拒绝的场景一般有两个:一个是线程池异常关闭了,另一个是添加到线程池的任务数量已经超过阻塞队里最大值了
四种拒绝策略:
AbortPolicy -- 当任务添加到线程池中被拒绝时,它将抛出 RejectedExecutionException 异常。 默认的拒绝策略!
CallerRunsPolicy -- 当任务添加到线程池中被拒绝时,会在线程池当前正在运行的Thread线程池中处理被拒绝的任务。马上执行。
DiscardOldestPolicy -- 当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务,然后将被拒绝的任务添加到等待队列中。
DiscardPolicy -- 当任务添加到线程池中被拒绝时,线程池将丢弃被拒绝的任务。
使用方式:
// 构造函数指定
static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(3, 5, 2000, TimeUnit.MILLISECONDS, queue, new ThreadPoolExecutor.AbortPolicy());
// 线程池的set方法
threadPool.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());