JUC 源码解读系列--FutureTask 篇
前面在介绍线程池 ThreadPoolExecutor 时,只关注了执行任务的方法 execute(),ThreadPoolExecutor 中执行任务还有另一个方法–submit(),此方法支持带返回值的线程任务。其底层是基于 FutureTask 来实现的,该类也是 JUC 中提供的并发工具,本篇我们一起走进它的实现细节。
版本说明:本次源码解读基于 JDK1.8,请注意区分。
1. 基本用法
Callable<String> callable = new Callable() {
@Override
public String call() throws Exception {
String result;
// do computing
return result;
}
};
ThreadPoolExecutor executor = new ThreadPoolExecutor(3,
5,
10,
TimeUnit.SECONDS,
new LinkedBlockingQueue<>());
try {
Future<String> futureTask = executor.submit(callable);
try {
System.out.println(futureTask.get());
} catch (ExecutionException e) {
}
}
finally {
executor.shutdown();
}
我们看一下 ThreadPoolExecutor 中 submit() 方法的实现:
// 此方法继承自 ThreadPoolExecutor 的父类 AbstractExecutorService
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
2. 类关系图
从上图可以看出,FutureTask 实现了 RunnableFuture 接口,而 RunnableFuture 接口分别继承了 Runnable 和 Future 接口。Runnable 接口是我们所熟悉的创建线程的一种方式,因此我们重点看一下 Future 接口:
2.1 Future
// Future 代表一个异步计算的结果。
// 它提供了一些方法:检查任务的完成状态,等待并获取任务结果,以及取消任务。
// 如果我们使用 Future 只是为了使用取消任务的功能,可以将 类型声明为 Future<?>,
// 并且返回 null 作为任务的执行结果即可。
public interface Future<V> {
// 尝试取消任务的执行。
// 如果任务已经完成或者已经被取消,将会尝试失败。
// 如果执行此方法时任务还没有开始执行,这个任务将不会被执行。
// 如果执行此方法时任务正在执行,
// mayInterruptIfRunning 参数来控制是否进行中断。
// 此方法返回后,之后对 isDone() 方法的调用总是返回 true。
// 此方法返回后,如果返回 true,之后对 isCancelled() 方法的调用总是返回 true。
boolean cancel(boolean mayInterruptIfRunning);
// 任务是否已经被成功取消。
// 任务结束前被成功取消,返回真。
boolean isCancelled();
// 任务是否已经完成。
// 任务正常结束,异常结束,或者被成功取消,都算作完成。
boolean isDone();
// 等待计算的完成,并获取执行的结果。
V get() throws InterruptedException, ExecutionException;
// 等待计算的完成,并获取执行的结果。带超时时间。
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
3. FutureTask 实现原理
3.1 属性及构造方法
/**
* The run state of this task, initially NEW. The run state
* transitions to a terminal state only in methods set,
* setException, and cancel. During completion, state may take on
* transient values of COMPLETING (while outcome is being set) or
* INTERRUPTING (only while interrupting the runner to satisfy a
* cancel(true)). Transitions from these intermediate to final
* states use cheaper ordered/lazy writes because values are unique
* and cannot be further modified.
*
* Possible state transitions:
* NEW -> COMPLETING -> NORMAL
* NEW -> COMPLETING -> EXCEPTIONAL
* NEW -> CANCELLED
* NEW -> INTERRUPTING -> INTERRUPTED
*/
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
/** The underlying callable; nulled out after running */
private Callable<V> callable;
/** The result to return or exception to throw from get() */
// 任务的计算结果,对 outcome 的修改都是基于 state 的 CAS 操作,
// 因此不存在线程安全问题。
private Object outcome; // non-volatile, protected by state reads/writes
/** The thread running the callable; CASed during run() */
private volatile Thread runner;
/** Treiber stack of waiting threads */
private volatile WaitNode waiters;
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
// 初始化 state 为 NEW
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
// 通过工具类 Executors 获得一个适配器
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
Executors.callable()
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
3.2 get()
public V get() throws InterruptedException, ExecutionException {
int s = state;
// 如果任务还未完成,等待。
// 任务的完成包含正常结束、异常结束以及取消
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
awaitDone()
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
// 如果当前线程被中断,将对应节点移除
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
int s = state;
// 如果任务已经完成,不需要构建节点并压栈
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 构建新的节点
else if (q == null)
q = new WaitNode();
// waiters 是栈的结构,将新的节点压栈
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
// 超时判断
else if (timed) {
// 如果等待的线程已经超时,移除对应节点,返回当前任务状态
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
// 超时,移除节点
removeWaiter(q);
return state;
}
// 带超时时间的挂起
LockSupport.parkNanos(this, nanos);
}
else
LockSupport.park(this);
}
}
report()
private V report(int s) throws ExecutionException {
Object x = outcome;
// 正常结束,返回计算结果
if (s == NORMAL)
return (V)x;
// 被取消,抛出取消异常
if (s >= CANCELLED)
throw new CancellationException();
// 执行过程中出现异常,将异常向上抛
throw new ExecutionException((Throwable)x);
}
3.3 cancel()
public boolean cancel(boolean mayInterruptIfRunning) {
// 只能取消还未执行的任务。
// 如果允许中断,先更新成中间状态,中断线程之后再更新成已中断
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
// 唤醒所有等待获取任务执行结果的线程
finishCompletion();
}
return true;
}
finishCompletion()
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
// 供用户扩展的方法
done();
callable = null; // to reduce footprint
}
3.4 run()
public void run() {
// 任务开始执行时,任务的状态必须是 NEW。
// 将任务的 runner 设置成当前线程,设置失败说明任务已被其他线程执行。
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
// 对 state 进行双重检查
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 任务执行正常结束,得到执行结果
result = c.call();
ran = true;
} catch (Throwable ex) {
// 任务异常结束,将异常作为运行结果
result = null;
ran = false;
setException(ex);
}
if (ran)
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
// 退出任务执行前,将任务的 runner 置空
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
// 取消任务的方法中,先获取执行任务的线程,再对线程进行非空判断并中断线程。
// 假如非空判断成立后,上面的 runner = null; 才被执行,可能导致泄露。
// 因此这里要对状态重新判断并进行相应处理。
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
set()
// 将 callable 的运行结果作为计算结果返回
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
// 唤醒等待计算完成的线程
finishCompletion();
}
}
setException()
// 将异常信息设置成任务的计算结果
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
// 唤醒等待任务结束的线程
finishCompletion();
}
}
handlePossibleCancellationInterrupt()
private void handlePossibleCancellationInterrupt(int s) {
// It is possible for our interrupter to stall before getting a
// chance to interrupt us. Let's spin-wait patiently.
// 任务的状态被置为 INTERRUPTING 之后,才去调用线程的中断操作,
// 如果此时线程还未被中断,让出时间片等待中断操作。
if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
// assert state == INTERRUPTED;
// We want to clear any interrupt we may have received from
// cancel(true). However, it is permissible to use interrupts
// as an independent mechanism for a task to communicate with
// its caller, and there is no way to clear only the
// cancellation interrupt.
//
// Thread.interrupted();
}
4. 总结
FutureTask 实现了 RunnableFuture 接口,对 Runnable 和 Future 接口进行了实现。run() 方法中实现了任务的执行及计算结果的设置。对 Future 接口的实现,提供了任务执行状态的检查,任务的取消操作,以及任务结果的获取。
在任务执行完毕(包含正常结束,异常结束,被取消)之前,等待获取执行结果的线程一直处于阻塞状态(超时除外),这些线程被构造成 WaitNode 节点对象压入栈中。当任务执行完毕后,会唤醒所有的等待结果的线程并返回执行的结果,如果执行过程中出现异常,则抛出 ExecutionException 异常,如果任务是被取消而结束的,则抛出 CancellationException 异常。
FutureTask 中使用状态机制来管理任务的生命周期。
各个状态的含义如下:
- NEW:初始状态,任务还未被取消或被线程执行。
- COMPLETING:任务执行正常或异常结束,outcome 被设置之前的状态。
- NORMAL:任务执行正常结束,outcome 被设置之后的状态。
- EXCEPTIONAL:任务执行异常结束,outcome 被设置之后的状态。
- CANCELLED:任务被 cancel(false) 方法调用取消后的状态。
- INTERRUPTING:任务被 cancel(true) 方法调用取消时,线程被中断前的中间状态。
- INTERRUPTED:任务被 cancel(true) 方法调用取消后的状态。
状态的转换如下:
- 任务正常结束:NEW -> COMPLETING -> NORMAL
- 任务异常结束:NEW -> COMPLETING -> EXCEPTIONAL
- 任务被 cancel(false) 方法调用取消:NEW -> CANCELLED
- 任务被 cancel(true) 方法调用取消:NEW -> INTERRUPTING -> INTERRUPTED
推荐阅读
-
JUC 源码解读系列--FutureTask 篇
-
SpringMVC源码解读之 HandlerMapping - AbstractDetectingUrlHandlerMapping系列初始化
-
SpringMVC源码解读之HandlerMapping - AbstractUrlHandlerMapping系列request分发
-
SpringMVC源码解读之 HandlerMapping - AbstractDetectingUrlHandlerMapping系列初始化
-
SpringMVC源码解读之HandlerMapping - AbstractUrlHandlerMapping系列request分发
-
spring-boot-2.0.3不一样系列之源码篇 - run方法(四)之prepareContext,绝对有值得你看的地方
-
ASP.NET 5系列教程(七)完结篇-解读代码
-
spring-boot-2.0.3不一样系列之源码篇 - run方法(三)之createApplicationContext,绝对有值得你看的地方
-
spring-boot-2.0.3不一样系列之源码篇 - 阶段总结
-
spring-boot-2.0.3不一样系列之源码篇 - SpringApplication的run方法(一)之SpringApplicationRunListener,绝对有值得你看的地方