ThreadPoolExecutor源码阅读笔记(二)FutureTask
BlockingQueue: 队列他决定了任务的调度方式,我们主要关注BlockingQueue的offer, poll,take三个方法
offer往队列里面添加任务如果队列已经满了话返回false,
poll在规定的时间内从队列里面取出任务如果队列是空的就返回null,
take也是从队列里面取出任务如果队列是空的则阻塞(保证线程池核心线程一直存在的时候有妙用)
SynchronousQueue:避免每次submit一个任务都开一个线程,复用该线程【设计单个线程的线程池】
LinkedBlockingQueue:基于链表的queue,先进先出,可以设置也可以不设置queue的大小,不设置就是默认的大小。
BlockingQueue 队列里面放得是FutureTask,
从队列里面把FutureTask任务拿出来之后调用的是FutureTask的run方法。
run方法里面会调用FutureTask里面Callable的call方法,
call方法调用完之后保存住了call的返回值。这样FutureTask就可以通过get方法得到这个返回值
FutureTask.java
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);//取消任务(不一定能取消成功,未执行的取消后将不再执行)
boolean isCancelled();//任务是否被取消
boolean isDone();//任务是否已完成
//等待任务执行完毕,然后返回结果
V get() throws InterruptedException, ExecutionException;
//在timeout时间内等待任务执行完毕,返回结果
V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}
public class FutureTask<V> implements RunnableFuture<V> {
/**
* Possible state transitions:状态变化的几种情况
* NEW -> COMPLETING -> NORMAL //正常执行完成时状态变迁
* NEW -> COMPLETING -> EXCEPTIONAL //run方法抛出异常时的状态变迁
* NEW -> CANCELLED //cancel()成功时的状态变迁
* NEW -> INTERRUPTING -> INTERRUPTED //线程inturupt成功时的状态变迁
*/
private volatile int state;//记录任务状态,默认为NEW
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
private Callable<V> callable;//真正执行的任务,执行完后会置空
private Object outcome;//要返回的结果,或者要抛出的异常
/** The thread running the callable; CASed during run() */
private volatile Thread runner;//执行任务的线程
private volatile WaitNode waiters;//等待执行的线程栈
/**
* 返回执行完的任务的结果或者抛出的异常
* @param s 任务状态
*/
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)//正常执行完毕,返回结果
return (V)x;
if (s >= CANCELLED)//任务取消或中断抛出取消异常
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}
public FutureTask(Callable<V> callable) {
if (callable == null) throw new NullPointerException();
this.callable = callable;//传入要执行的任务
this.state = NEW; //任务默认状态NEW
}
/**
* @param runnable 要执行的任务
* @param result 成功时返回的结果
*/
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);//传入参数组装成Callable
this.state = NEW; //任务默认状态NEW
}
//CANCELLED、INTERRUPTING、INTERRUPTED,都返回true
public boolean isCancelled() {
return state >= CANCELLED;
}
//任务是否已经开始执行
public boolean isDone() {
return state != NEW;
}
public boolean cancel(boolean mayInterruptIfRunning) {
if (!(state == NEW && U.compareAndSwapInt(this, STATE, NEW, mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();//中断执行任务的线程
} finally {
//设置state为INTERRUPTED
U.putOrderedInt(this, STATE, INTERRUPTED);
}
}
} finally {
finishCompletion();
}
return true;
}
/**
* @throws CancellationException {@inheritDoc}
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
//通过LockSupport.park()/parkNanos(),阻塞线程,直到任务执行完,执行完时,会在finishCompletion()方法中释放许可
s = awaitDone(false, 0L);
return report(s);
}
/**
* @throws CancellationException {@inheritDoc}
*/
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null) throw new NullPointerException();
int s = state;
if (s <= COMPLETING && (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
/**
* 该方法供子类实现,在任务执行完之后做一些操作,在任务执行完成或者取消后会回调
* 调用处参考:ExecutorCompletionService.QueueingFuture
*/
protected void done() { }
/**
* 在run中执行,设置任务结果,除非任务已经设置或者被取消
*/
protected void set(V v) {
/* 这里利用了Unsafe的多线程同步,CAS操作。
* compareAndSwap(CAS)方法是原子的,可以避免繁重的锁机制,提高代码效率。
* 这是一种乐观锁,通常认为在大部分情况下不出现竞态条件,如果操作失败,会不断重试直到成功。
* compareAndSwapInt(...)
* 第一个参数为需要改变的对象,
* 第二个为偏移量(即之前求出来的valueOffset的值),
* 第三个参数为期待的值,
* 第四个为更新后的值。
* 若调用该方法时,state字段的值与参数三的值相等,那么则将state修改为参数四的值,并返回一个true,
* 如果调用该方法时,state的值与参数三的值不相等,那么不做任何操作,并范围一个false。
*/
if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
outcome = v;
//将state设置成NORMAL
U.putOrderedInt(this, STATE, NORMAL);
finishCompletion();
}
}
/**
* run方法执行中抛出异常时,回调该方法
* Causes this future to report an {@link ExecutionException}
* with the given throwable as its cause, unless this future has
* already been set or has been cancelled.
* <p>This method is invoked internally by the {@link #run} method
* upon failure of the computation.
* @param t the cause of failure
*/
protected void setException(Throwable t) {
//当前state为NEW则设置为COMPLETING并返回true,否则返回false
if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
outcome = t;
//设置state状态为EXCEPTIONAL
U.putOrderedInt(this, STATE, EXCEPTIONAL);
finishCompletion();
}
}
public void run() {
/* Unsafe.CAS
* 如果state不为NEW,或者runner非空,则return掉。
* 如果runner为null,则给runner指向当前线程
*/
if (state != NEW || !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {//state为NEW,且callable非空
V result;
boolean ran;
try {
result = c.call();//执行返回执行结果
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
setException(ex);
}
if (ran)//任务执行完之后,设置执行结果
set(result);
}
} finally {
runner = null;//执行完之后置空
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);//线程中断的处理
}
}
/**
* Executes the computation without setting its result, and then
* resets this future to initial state, failing to do so if the
* computation encounters an exception or is cancelled. This is
* designed for use with tasks that intrinsically execute more
* than once.
* @return {@code true} if successfully run and reset
*/
protected boolean runAndReset() {
if (state != NEW || !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}
/**
* 确保由cancel(true)引起的interrupt,只在run()/runAndReset()传入任务
* 不理解该方法到底有何用
* Ensures that any interrupt from a possible cancel(true) is only
* delivered to a task while in run or runAndReset.
*/
private void handlePossibleCancellationInterrupt(int s) {
if (s == INTERRUPTING)
while (state == INTERRUPTING)//INTERRUPTING状态时
Thread.yield();//暗示调度器让当前线程出让正在使用的处理器。调度器可*地忽略这种暗示。
}
/**
* 单链表实现的栈,用来记录等待中的线程
*/
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
/**
* 遍历清空WaitNode单链表中所有等待的线程,unpark(thread)释放线程许可,并把waiters、callable置空
*/
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {//waiters非空,死循环
//Unsafe.CAS,waiters的值如果与q相等,则waiters置空,并返回true
if (U.compareAndSwapObject(this, WAITERS, q, null)) {//上面一行代码刚刚赋值,所以第一次循环肯定true,且给waiters赋值为null
for (;;) {//开启死循环
Thread t = q.thread;//q引用的thread
if (t != null) {
q.thread = null;//取消引用
/* 释放许可,如果给定线程的许可尚不可用,则使其可用
* 内部还是通过Unsafe.unpark()实现的
*/
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)//下一个WaitNode为空,跳出死循环,此时q指向的WaitNode不知道是哪个了
break;
q.next = null; // unlink to help gc
q = next;//q指向下一个WaitNode,开始下一次循环
}
break;
}
}
done();//给子类用的,本类中是空实现
callable = null;
}
/**
* 等待完成、或者因打断线程/超时 而中断,返回当前任务状态state
* Awaits completion or aborts on interrupt or timeout.
* @param timed true if use timed waits
* @param nanos time to wait, if timed
* @return state upon completion or at timeout
*/
private int awaitDone(boolean timed, long nanos) throws InterruptedException {
// The code below is very delicate, to achieve these goals:
// - call nanoTime exactly once for each call to park
// - if nanos <= 0L, return promptly without allocation or nanoTime
// - if nanos == Long.MIN_VALUE, don't underflow
// - if nanos == Long.MAX_VALUE, and nanoTime is non-monotonic
// and we suffer a spurious wakeup, we will do no worse than
// to park-spin for a while
long startTime = 0L; // Special value 0L means not yet parked
WaitNode q = null;
boolean queued = false;
for (;;) {
int s = state;
if (s > COMPLETING) {//表示已经有结果(成功、或者失败、取消)
if (q != null)
q.thread = null;//取消对线程的引用
return s;//返回任务状态
}else if (s == COMPLETING)//在set/setException中会设置为COMPLETING,这两个方法又是在run/runAndReset中才会调用的
Thread.yield();//让出线程,我不知道这是做什么
else if (Thread.interrupted()) {//线程中断了,cancel(true)触发thread.interrupt()
removeWaiter(q);//移除等待的线程
throw new InterruptedException();//抛出异常
} else if (q == null) {
if (timed && nanos <= 0L)//设置了超时,且超时时间为0,直接返回任务当前状态
return s;
q = new WaitNode();//创建任务
} else if (!queued) {
queued = U.compareAndSwapObject(this, WAITERS, q.next = waiters, q);
} else if (timed) {
final long parkNanos;
if (startTime == 0L) { // first time
startTime = System.nanoTime();
if (startTime == 0L)
startTime = 1L;
parkNanos = nanos;
} else {
long elapsed = System.nanoTime() - startTime;
if (elapsed >= nanos) {//超时
removeWaiter(q);
return state;
}
parkNanos = nanos - elapsed;
}
// nanoTime may be slow; recheck before parking
if (state < COMPLETING)
LockSupport.parkNanos(this, parkNanos);//占用许可,阻塞线程parkNanos时间
} else {
LockSupport.park(this);//占用许可,阻塞线程
}
}
}
/**
* 这循环除了给pred、q、s几个变量赋值,好像什么都没干,有毛用
* Tries to unlink a timed-out or interrupted wait node to avoid
* accumulating garbage. Internal nodes are simply unspliced
* without CAS since it is harmless if they are traversed anyway
* by releasers. To avoid effects of unsplicing from already
* removed nodes, the list is retraversed in case of an apparent
* race. This is slow when there are a lot of nodes, but we don't
* expect lists to be long enough to outweigh higher-overhead
* schemes.
*/
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:for (;;) {//死循环
//pred上一个WaitNode,q当前的WaitNode,s下一个WaitNode
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
//q默认值是waiters,waiters默认是null
s = q.next;
/* q.thread为空,那么pred就为空,只能执行最后一个else if即q指向s(s是q.next)下一个WaitNode
* 直到q.thread非空,触发else if (pred != null){}代码块
* 如果q.thread一直非空,会一直循环到pred指向最后一个thread非空的WaitNode,触发else if (pred != null){}代码块
*/
if (q.thread != null) {
pred = q;
} else if (pred != null) {
pred.next = s;//pred的next指向q的next
if (pred.thread == null) // check for race
continue retry;//跳出内层循环,继续外层循环(外层是死循环,也就是重跑一遍)
/* 如果当前waiters=q,q指向s(s是q.next),
* 如果waiters!=q跳出内循环,继续外层循环*/
} else if (!U.compareAndSwapObject(this, WAITERS, q, s)) {
continue retry;//跳出内层循环,继续外层循环(外层是死循环,也就是重跑一遍)
}
}
break;
}
}
}
// Unsafe mechanics
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
private static final long STATE;
private static final long RUNNER;
private static final long WAITERS;
static {
try {
STATE = U.objectFieldOffset
(FutureTask.class.getDeclaredField("state"));
RUNNER = U.objectFieldOffset
(FutureTask.class.getDeclaredField("runner"));
WAITERS = U.objectFieldOffset
(FutureTask.class.getDeclaredField("waiters"));
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
// Reduce the risk of rare disastrous classloading in first call to
// LockSupport.park: https://bugs.openjdk.java.net/browse/JDK-8074773
Class<?> ensureLoaded = LockSupport.class;
}
}
小结:1,通过Unsafe.CAS线程安全地去比较和设置任务状态state、执行线程runner、WaitNode等
2,awaitDone()内部通过LockSupport.park()/parkNanous()阻塞线程,也是因此FutureTask任务在子线程执行完后能返回结果,任务执行完成后LockSupport.unpark(thread)是否许可,取消阻塞。而LockSupport.park()/unpark()内部依然是用了Unsafe.park()/unpark()来实现的,其他细节看看了解下就好了。
3、FutureTask其实也是实现了Runnable,只是在此基础上加了很多东西,线程触发run(),在run()中又调用了callable.call(),然后调用下futureTask.get()方法,get()方法中调用了awaitDone(),awaitDone中又有LockSupport.park()阻塞线程,知道call()执行完毕返回结果,或者任务取消线程中断,调用finishCompletion(),finishCompletion内部调用了unpark()释放许可,就会等线程执行完毕,返回结果。由于实现了Runnable,所以可以当成Runnable用,又有比Runnable更多的功能。
4、FutureTask提供了一些api,cancel(true),isCanceled()等
5、AtomicInteger等,也是通过Unsafe来实现原子性同步的。
参考:https://blog.csdn.net/wuyuxing24/article/details/50989530
上一篇: apollo事件通信机制
下一篇: Request Body Search