日常记录——多线程与高并发—ThreadPoolExecutor源码解析
程序员文章站
2022-06-30 19:14:36
package java.util.concurrent;import java.security.AccessControlContext;import java.security.AccessController;import java.security.PrivilegedAction;import java.util.concurrent.locks.AbstractQueuedSynchronizer;import java.util.concurrent.locks.Conditio...
package java.util.concurrent;
import java.security.AccessControlContext;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.*;
public class ThreadPoolExecutor extends AbstractExecutorService {
//高3位来表示线程池的运行状态, 用低29位来表示线程池内有效线程数量。 初始状态为RUNNING
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
//记录线程数量的占ctl的位数
private static final int COUNT_BITS = Integer.SIZE - 3;
//线程池最大可以容纳的线程数量
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// 表示运行状态
private static final int RUNNING = -1 << COUNT_BITS;
//不接受新的任务,但是可以处理任务队列里的任务
private static final int SHUTDOWN = 0 << COUNT_BITS;
//不接受新的任务,不处理阻塞队列里的任务,中断正在处理的任务。
private static final int STOP = 1 << COUNT_BITS;
//表示过渡状态。此时所有的任务都执行完毕,当前线程池已经没有有效的线程。
private static final int TIDYING = 2 << COUNT_BITS;
//表示终止状态
private static final int TERMINATED = 3 << COUNT_BITS;
//获取线程池状态
private static int runStateOf(int c) { return c & ~CAPACITY; }
//获取线程数量
private static int workerCountOf(int c) { return c & CAPACITY; }
//基于线程池状态和线程数量生成一个ctl值。
private static int ctlOf(int rs, int wc) { return rs | wc; }
//线程池状态小于s
private static boolean runStateLessThan(int c, int s) {
return c < s;
}
//线程池状态大于等于s
private static boolean runStateAtLeast(int c, int s) {
return c >= s;
}
//线程池状态是否在运行
private static boolean isRunning(int c) {
return c < SHUTDOWN;
}
//尝试CAS递增ctl的工作线程数量。
private boolean compareAndIncrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect + 1);
}
//尝试CAS递减ctl的工作线程数量。
private boolean compareAndDecrementWorkerCount(int expect) {
return ctl.compareAndSet(expect, expect - 1);
}
//减少ctl的工作线程。仅在线程突然终止时调用此方法。一直减到 失败为止
private void decrementWorkerCount() {
do {} while (! compareAndDecrementWorkerCount(ctl.get()));
}
//任务队列
private final BlockingQueue<Runnable> workQueue;
//资源锁
private final ReentrantLock mainLock = new ReentrantLock();
//工作队列
private final HashSet<Worker> workers = new HashSet<Worker>();
//为了支持阻塞
private final Condition termination = mainLock.newCondition();
//跟踪线程池最大值
private int largestPoolSize;
//完成任务数量
private long completedTaskCount;
//线程工厂
private volatile ThreadFactory threadFactory;
//拒绝策略
private volatile RejectedExecutionHandler handler;
//空闲线程存活时间
private volatile long keepAliveTime;
//核心线程是否遵循存活时间标志
private volatile boolean allowCoreThreadTimeOut;
//核心线程数量
private volatile int corePoolSize;
//最大线程数
private volatile int maximumPoolSize;
//默认拒绝策略,直接抛出异常
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
//执行终结器时要使用的上下文
private final AccessControlContext acc;
//工作线程封装内部类
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
private static final long serialVersionUID = 6138294804551838833L;
//工作线程
final Thread thread;
//初始任务
Runnable firstTask;
//完成任务数量
volatile long completedTasks;
//构造方法
Worker(Runnable firstTask) {
setState(-1); // 设置状态为未启动
this.firstTask = firstTask;//初始化任务为当前任务
this.thread = getThreadFactory().newThread(this);//创建线程
}
//执行任务
public void run() {
runWorker(this);
}
// 返回锁状态
protected boolean isHeldExclusively() {
return getState() != 0;
}
//尝试获取锁
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
//尝试释放锁
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
//获取锁
public void lock() { acquire(1); }
//尝试获取锁
public boolean tryLock() { return tryAcquire(1); }
//释放锁
public void unlock() { release(1); }
//尝试释放锁
public boolean isLocked() { return isHeldExclusively(); }
//如果线程运行中 并未标记中断 进行中断
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
//如果要更新的状态小于线程池当前状态 更新线程池状态
private void advanceRunState(int targetState) {
for (;;) {
int c = ctl.get();
if (runStateAtLeast(c, targetState) ||
ctl.compareAndSet(c, ctlOf(targetState, workerCountOf(c))))
break;
}
}
//尝试终止线程池
final void tryTerminate() {
for (;;) {
// 获取线程池状态
int c = ctl.get();
// 判断是否在运行中,如果是直接返回
if (isRunning(c) ||
//状态为SHUTDOWN或STOP 直接返回 队列有任务执行 或者 任务未清理完
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
//判断工作线程是否为0 不为0 中断一个工作线程 返回
if (workerCountOf(c) != 0) {
interruptIdleWorkers(ONLY_ONE);
return;
}
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//尝试设置状态为TIDYING
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
//空方法
terminated();
} finally {
//设置状态为TERMINATED
ctl.set(ctlOf(TERMINATED, 0));
//唤醒调用了awaitTermination(long timeout, TimeUnit unit)的线程 awaitTermination中调用了
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
}
}
//通过安全管理器检查是否有关闭线程权限。
private void checkShutdownAccess() {
SecurityManager security = System.getSecurityManager();
if (security != null) {
security.checkPermission(shutdownPerm);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
security.checkAccess(w.thread);
} finally {
mainLock.unlock();
}
}
}
////中断全部工作线程
private void interruptWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfStarted();
} finally {
mainLock.unlock();
}
}
//参数为onlyOne,如果传入true,表示只中断一个线程 否则中断所有空闲线程
private void interruptIdleWorkers(boolean onlyOne) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers) {
Thread t = w.thread;
////如果线程未被中断,且获取work的锁成功(说明空闲),则中断线程
if (!t.isInterrupted() && w.tryLock()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
} finally {
w.unlock();
}
}
if (onlyOne)
break;
}
} finally {
mainLock.unlock();
}
}
//中断所有空闲线程
private void interruptIdleWorkers() {
interruptIdleWorkers(false);
}
//中断工作线程辅助标记。
private static final boolean ONLY_ONE = true;
//ScheduledThreadPoolExecutor 使用的策略
final void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
void onShutdown() {
}
//检查线程池状态是否为RUNNING或SHUTDOWN
final boolean isRunningOrShutdown(boolean shutdownOK) {
int rs = runStateOf(ctl.get());
return rs == RUNNING || (rs == SHUTDOWN && shutdownOK);
}
//清空任务队列返回未执行任务
private List<Runnable> drainQueue() {
BlockingQueue<Runnable> q = workQueue;
ArrayList<Runnable> taskList = new ArrayList<Runnable>();
q.drainTo(taskList);
if (!q.isEmpty()) {
for (Runnable r : q.toArray(new Runnable[0])) {
if (q.remove(r))
taskList.add(r);
}
}
return taskList;
}
//增加新的工作线程
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 就是SHUTDOWN 及以上状态 不接受新任务 则不新建工作线程
// RUNNING 如果队列为空或者firstTask为空 不需要新建工作线程
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
//则返回false,添加失败
return false;
for (;;) {
//获取线程数量
int wc = workerCountOf(c);
////判断是否超过线程数量的限制
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize))
return false;
//未超过限制则尝试把线程数加1,成功跳出循环
if (compareAndIncrementWorkerCount(c))
break retry;
c = ctl.get(); // Re-read ctl
//如果期间线程池状态改变 重新进入循环
if (runStateOf(c) != rs)
continue retry;
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
//创建新的工作线程类对象
w = new Worker(firstTask);
//获取线程对象
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;、
//加锁 workers是hashSet类型 线程不安全
mainLock.lock();
try {
// 获取线程池状态
int rs = runStateOf(ctl.get());
//运行状态为RUNNING 或者 SHUTDOWN 并且 firstTask 为空
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
//检查线程是否已启动
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
//添加 进工作队列
workers.add(w);
//更新线程池最大值
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
//添加失败则 删除这个 工作对象
addWorkerFailed(w);
}
return workerStarted;
}
////添加失败 删除这个 工作对象
private void addWorkerFailed(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (w != null)
//删除
workers.remove(w);
//数量减一
decrementWorkerCount();
//尝试终止线程池 可能是因为线程池状态变化 导致添加失败 所以尝试终止
tryTerminate();
} finally {
mainLock.unlock();
}
}
//将工作线程从容器中剔除
private void processWorkerExit(Worker w, boolean completedAbruptly) {
// 判断是否是异常情况导致工作线程被回收
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
// 扣减工作线程总数
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// 将当前工作线程完成任务的总数加到completedTaskCount上
completedTaskCount += w.completedTasks;
workers.remove(w);
} finally {
mainLock.unlock();
}
// 尝试结束线程池
tryTerminate();
int c = ctl.get();
// 判断当前线程池状态是否为 RUNNING 或者 SHUTDOWN
if (runStateLessThan(c, STOP)) {
//非异常情况导致工作线程被回收
if (!completedAbruptly) {
//最小线程数
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
//如果大于核心数 返回
if (workerCountOf(c) >= min)
return; // replacement not needed
}
/ //否则创建 工作线程
addWorker(null, false);
}
}
//获取任务
private Runnable getTask() {
// 超时标志
boolean timedOut = false;
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 停止状态或者任务队列为空
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
//扣减工作线程总数
decrementWorkerCount();
return null;
}
//获取线程数量
int wc = workerCountOf(c);
// 超时或者超过核心线程数
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
//超过核心线程池数或者 超时 并且 线程数量大于1 或者 任务队列为空
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
//线程池工作线程数量减一
if (compareAndDecrementWorkerCount(c))
return null;
continue;
}
try {
//获取任务 返回
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
//执行任务
final void runWorker(Worker w) {
//当前工作线程
Thread wt = Thread.currentThread();
//初始化任务
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
//获取任务
while (task != null || (task = getTask()) != null) {
w.lock();
// 线程池是STOP,TIDYING,TERMINATED状态,中断当前工作线程
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
//空方法
beforeExecute(wt, task);
Throwable thrown = null;
try {
// 执行任务
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
//空方法
afterExecute(task, thrown);
}
} finally {
task = null;
//完成任务+1
w.completedTasks++;
w.unlock();
}
}
//未出异常 安全执行
completedAbruptly = false;
} finally {
//将工作线程从容器中剔除
processWorkerExit(w, completedAbruptly);
}
}
//构造方法 上篇文章写过
(https://blog.csdn.net/weixin_43001336/article/details/107326880)
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
//java安全管理器
this.acc = System.getSecurityManager() == null ?
null :
AccessController.getContext();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
//转换为纳秒
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
//提交任务
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
//获取线程池标志
int c = ctl.get();
//小于核心数 增加工作线程
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
//RUNNING状态 并且 入队成功
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
//不是RUNNING状态 删除任务 执行拒绝策略
if (! isRunning(recheck) && remove(command))
reject(command);
//工作队列为0 添加工作队列
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
//创建新线程失败 执行拒绝策略
else if (!addWorker(command, false))
reject(command);
}
//关闭线程池
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查是否有关闭线程池权限
checkShutdownAccess();
//更新线程池状态为SHUTDOWN
advanceRunState(SHUTDOWN);
//中断所有空闲工作线程
interruptIdleWorkers();
//空方法
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
//尝试终止线程池
tryTerminate();
}
//立即 关闭线程池 返回未完成任务
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//检查是否有关闭线程池权限
checkShutdownAccess();
//更新线程状态为STOP
advanceRunState(STOP);
//中断所有工作线程
interruptWorkers();
//将未完成任务 赋值tasks 并将任务队列清空
tasks = drainQueue();
} finally {
mainLock.unlock();
}
//尝试终止线程池
tryTerminate();
return tasks;
}
//是否SHUTDOWN
public boolean isShutdown() {
return ! isRunning(ctl.get());
}
//是否可以TERMINATED
public boolean isTerminating() {
int c = ctl.get();
return ! isRunning(c) && runStateLessThan(c, TERMINATED);
}
//是否TERMINATED
public boolean isTerminated() {
return runStateAtLeast(ctl.get(), TERMINATED);
}
//阻塞 unit单位timeout长时间
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
//状态是否为TERMINATED
if (runStateAtLeast(ctl.get(), TERMINATED))
return true;
if (nanos <= 0)
return false;
//设置等待最大时间 返回真实等待时间
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
//GC执行方法
protected void finalize() {
SecurityManager sm = System.getSecurityManager();
if (sm == null || acc == null) {
shutdown();
} else {
PrivilegedAction<Void> pa = () -> { shutdown(); return null; };
AccessController.doPrivileged(pa, acc);
}
}
//线程工厂set方法
public void setThreadFactory(ThreadFactory threadFactory) {
if (threadFactory == null)
throw new NullPointerException();
this.threadFactory = threadFactory;
}
//线程工厂get方法
public ThreadFactory getThreadFactory() {
return threadFactory;
}
//拒绝策略set方法
public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
if (handler == null)
throw new NullPointerException();
this.handler = handler;
}
//拒绝策略get方法
public RejectedExecutionHandler getRejectedExecutionHandler() {
return handler;
}
//核心线程数set方法
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
int delta = corePoolSize - this.corePoolSize;
this.corePoolSize = corePoolSize;
//线程数大于核心数 中断空闲线程
if (workerCountOf(ctl.get()) > corePoolSize)
interruptIdleWorkers();
else if (delta > 0) {
//增加工作线程 直到 到达新设定的核心数 或者 任务队列为空
int k = Math.min(delta, workQueue.size());
while (k-- > 0 && addWorker(null, true)) {
if (workQueue.isEmpty())
break;
}
}
}
public int getCorePoolSize() {
return corePoolSize;
}
//默认启动一个核心线程
public boolean prestartCoreThread() {
return workerCountOf(ctl.get()) < corePoolSize &&
addWorker(null, true);
}
//最少启动一个核心线程
void ensurePrestart() {
int wc = workerCountOf(ctl.get());
if (wc < corePoolSize)
addWorker(null, true);
else if (wc == 0)
addWorker(null, false);
}
//启动所有核心线程等待工作
public int prestartAllCoreThreads() {
int n = 0;
while (addWorker(null, true))
++n;
return n;
}
//返回核心线程否回收标志
public boolean allowsCoreThreadTimeOut() {
return allowCoreThreadTimeOut;
}
//空闲线程回收方法
public void allowCoreThreadTimeOut(boolean value) {
if (value && keepAliveTime <= 0)
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
if (value != allowCoreThreadTimeOut) {
allowCoreThreadTimeOut = value;
if (value)
interruptIdleWorkers();
}
}
// 设置最大线程数
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
this.maximumPoolSize = maximumPoolSize;
//如果大于最大线程 中断所有核心线程
if (workerCountOf(ctl.get()) > maximumPoolSize)
interruptIdleWorkers();
}
public int getMaximumPoolSize() {
return maximumPoolSize;
}
//设置存活时间
public void setKeepAliveTime(long time, TimeUnit unit) {
if (time < 0)
throw new IllegalArgumentException();
if (time == 0 && allowsCoreThreadTimeOut())
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
long keepAliveTime = unit.toNanos(time);
long delta = keepAliveTime - this.keepAliveTime;
this.keepAliveTime = keepAliveTime;
if (delta < 0)
//回收空闲线程
interruptIdleWorkers();
}
public long getKeepAliveTime(TimeUnit unit) {
return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
}
//返回任务队列
public BlockingQueue<Runnable> getQueue() {
return workQueue;
}
//删除任务
public boolean remove(Runnable task) {
boolean removed = workQueue.remove(task);
tryTerminate(); //尝试终止线程池
return removed;
}
//清除取消的任务
public void purge() {
final BlockingQueue<Runnable> q = workQueue;
try {
Iterator<Runnable> it = q.iterator();
while (it.hasNext()) {
Runnable r = it.next();
if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
it.remove();
}
} catch (ConcurrentModificationException fallThrough) {
for (Object r : q.toArray())
if (r instanceof Future<?> && ((Future<?>)r).isCancelled())
q.remove(r);
}
tryTerminate(); // In case SHUTDOWN and now empty
}
//返回工作线程数量
public int getPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//TIDYING、TERMINATED 返回 0 否则返回 工作线程数量
return runStateAtLeast(ctl.get(), TIDYING) ? 0
: workers.size();
} finally {
mainLock.unlock();
}
}
//获取工作的线程数
public int getActiveCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int n = 0;
for (Worker w : workers)
if (w.isLocked())
++n;
return n;
} finally {
mainLock.unlock();
}
}
//获取线程池曾经历的最大值
public int getLargestPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
return largestPoolSize;
} finally {
mainLock.unlock();
}
}
//获取线程池需要执行的任务数量。
public long getTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//已经结束线工作程完成的任务数(completedTaskCount) + 还未结束线程工作线程完成的任务数(w.completedTasks)+正在执行的任务数(w.isLocked())+还未执行的任务数(workQueue.size())
long n = completedTaskCount;
for (Worker w : workers) {
n += w.completedTasks;
if (w.isLocked())
++n;
}
return n + workQueue.size();
} finally {
mainLock.unlock();
}
}
//获取已完成的任务数量
public long getCompletedTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//已经结束线工作程完成的任务数(completedTaskCount) + 还未结束线程工作线程完成的任务数(w.completedTasks)
long n = completedTaskCount;
for (Worker w : workers)
n += w.completedTasks;
return n;
} finally {
mainLock.unlock();
}
}
public String toString() {
long ncompleted;
int nworkers, nactive;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
ncompleted = completedTaskCount;
nactive = 0;
nworkers = workers.size();
for (Worker w : workers) {
ncompleted += w.completedTasks;
if (w.isLocked())
++nactive;
}
} finally {
mainLock.unlock();
}
int c = ctl.get();
String rs = (runStateLessThan(c, SHUTDOWN) ? "Running" :
(runStateAtLeast(c, TERMINATED) ? "Terminated" :
"Shutting down"));
return super.toString() +
"[" + rs +
", pool size = " + nworkers +
", active threads = " + nactive +
", queued tasks = " + workQueue.size() +
", completed tasks = " + ncompleted +
"]";
}
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }
//内部类拒绝策略 谁提交任务 谁去执行
public static class CallerRunsPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code CallerRunsPolicy}.
*/
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
//内部类拒绝策略 抛出异常
public static class AbortPolicy implements RejectedExecutionHandler {
/**
* Creates an {@code AbortPolicy}.
*/
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException("Task " + r.toString() +
" rejected from " +
e.toString());
}
}
//内部类拒绝策略 啥也不干
public static class DiscardPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardPolicy}.
*/
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
//内部类拒绝策略 删除任务队列最先入队任务 将当前任务入队
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
/**
* Creates a {@code DiscardOldestPolicy} for the given executor.
*/
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
}
本文地址:https://blog.csdn.net/weixin_43001336/article/details/107372063
上一篇: AI革命:深度学习缘何突然改变你的生活
下一篇: 全球抗癌研究:人工智能,基因测序齐上阵