并发系列(4)之 Future 框架详解
本文将主要讲解 j.u.c 中的 future 框架,并分析结合源码分析其内部结构逻辑;
一、future 框架概述
jdk 中的 future 框架实际就是 future 模式的实现,通常情况下我们会配合线程池使用,但也可以单独使用;下面我们就单独使用简单举例;
1. 应用实例
futuretask<string> future = new futuretask<>(() -> { log.info("异步任务执行..."); thread.sleep(2000); log.info("过了很久很久..."); return "异步任务完成"; }); log.info("启动异步任务..."); new thread(future).start(); log.info("继续其他任务..."); thread.sleep(1000); log.info("获取异步任务结果:{}", future.get());
打印:
[15:38:03,231 info ] [main] - 启动异步任务... [15:38:03,231 info ] [main] - 继续其他任务... [15:38:03,231 info ] [thread-0] - 异步任务执行... [15:38:05,232 info ] [thread-0] - 过了很久很久... [15:38:05,236 info ] [main] - 获取异步任务结果:异步任务完成
如上面代码所示,首先我们将要执行的任务包装成 callable
,这里如果不需要返回值也可以使用 runnable
;然后构建 futuretask
由一个线程启动,最后使用 future.get()
获取异步任务结果;
2. future 运行逻辑
对于 future 模式的流程图如下:
对比上面的实例代码,大家可能会发现有些不一样,因为在 futuretask 同时继承了 runnable 和 future 接口,所以再提交任务后没有返回future,而是直接使用自身调用 get;下面我们就对源码进行实际分析;
二、源码分析
1. futuretask 主体结构
public interface runnablefuture<v> extends runnable, future<v> {} public class futuretask<v> implements runnablefuture<v> { private volatile int state; // 任务运行状态 private callable<v> callable; // 异步任务 private object outcome; // 返回结果 private volatile thread runner; // 异步任务执行线程 private volatile waitnode waiters; // 等待异步结果的线程栈(通过treiber stack算法实现) public futuretask(callable<v> callable) { // 需要返回值 if (callable == null) throw new nullpointerexception(); this.callable = callable; this.state = new; // ensure visibility of callable } public futuretask(runnable runnable, v result) { this.callable = executors.callable(runnable, result); this.state = new; // ensure visibility of callable } ... }
另外在代码中还可以看见有很多地方都是用了 cas
来更新变量,而 jdk1.6 中甚至使用了 aqs
来实现;其原因就是同一个 futuretask
可以多个线程同时提交,也可以多个线程同时获取; 所以代码中有很多的状态变量:
// futuretask.state 取值 private static final int new = 0; // 初始化到结果返回前 private static final int completing = 1; // 结果赋值 private static final int normal = 2; // 执行完毕 private static final int exceptional = 3; // 执行异常 private static final int cancelled = 4; // 任务取消 private static final int interrupting = 5; // 设置中断状态 private static final int interrupted = 6; // 任务中断
同时源码的注释中也详细给出了可能出现的状态转换:
- new -> completing -> normal // 任务正常执行
- new -> completing -> exception // 任务执行异常
- new ->cancelled // 任务取消
- new -> initerrupting -> interrupted // 任务中断
注意这里的 completing
状态是一个很微妙的状态,正因为有他的存在才能实现无锁赋值;大家先留意这个状态,然后在代码中应该能体会到;另外这里还有一个变量需要注意,waitnode
;使用 treiber stack 算法实现的无锁栈;其原理说明可以参考下面第三节;
2. 任务执行
public void run() { if (state != new || // 确保任务执行完成后,不再重复执行 !unsafe.compareandswapobject(this, runneroffset, null, thread.currentthread())) // 确保只有一个线程执行 return; try { callable<v> c = callable; if (c != null && state == new) { v result; boolean ran; try { result = c.call(); ran = true; } catch (throwable ex) { result = null; ran = false; setexception(ex); // 设置异常结果 } if (ran) set(result); // 设置结果 } } finally { runner = null; int s = state; if (s >= interrupting) handlepossiblecancellationinterrupt(s); // 确保中断状态已经设置 } }
// 设置异步任务结果 protected void set(v v) { if (unsafe.compareandswapint(this, stateoffset, new, completing)) { // 保证结果只能设置一次 outcome = v; unsafe.putorderedint(this, stateoffset, normal); // final state finishcompletion(); // 唤醒等待线程 } }
protected void setexception(throwable t) { if (unsafe.compareandswapint(this, stateoffset, new, completing)) { // 保证结果只能设置一次 outcome = t; unsafe.putorderedint(this, stateoffset, exceptional); // final state finishcompletion(); } }
3. 任务取消
public boolean cancel(boolean mayinterruptifrunning) { if (!(state == new && // 只有在任务执行阶段才能取消 unsafe.compareandswapint(this, stateoffset, new, // 设置取消状态 mayinterruptifrunning ? interrupting : cancelled))) return false; try { // in case call to interrupt throws exception if (mayinterruptifrunning) { try { thread t = runner; if (t != null) t.interrupt(); } finally { // final state unsafe.putorderedint(this, stateoffset, interrupted); } } } finally { finishcompletion(); } return true; }
注意 cancel(false)
也就是仅取消,并没有打断;异步任务会继续执行,只是这里首先设置了 futuretask.state = cancelled
,所以最后在设置结果的时候会失败,unsafe.compareandswapint(this, stateoffset, new, completing)
;
4. 获取结果
public v get() throws interruptedexception, executionexception { int s = state; if (s <= completing) s = awaitdone(false, 0l); // 阻塞等待 return report(s); } private v report(int s) throws executionexception { // 根据最后的状态返回结果 object x = outcome; if (s == normal) return (v)x; if (s >= cancelled) throw new cancellationexception(); throw new executionexception((throwable)x); }
private int awaitdone(boolean timed, long nanos) throws interruptedexception { final long deadline = timed ? system.nanotime() + nanos : 0l; waitnode q = null; boolean queued = false; for (;;) { if (thread.interrupted()) { removewaiter(q); // 移除等待节点 throw new interruptedexception(); } int s = state; if (s > completing) { // 任务已完成 if (q != null) q.thread = null; return s; } else if (s == completing) // 正在赋值,直接先出让线程 thread.yield(); else if (q == null) // 任务还未完成需要等待 q = new waitnode(); else if (!queued) queued = unsafe.compareandswapobject(this, waitersoffset, q.next = waiters, q); // 使用 treiber stack 算法 else if (timed) { nanos = deadline - system.nanotime(); if (nanos <= 0l) { removewaiter(q); return state; } locksupport.parknanos(this, nanos); } else locksupport.park(this); } }
三、treiber stack
在《java 并发编程实战》中讲了, 创建非阻塞算法的关键在于,找出如何将原子修改的范围缩小到单个变量上,同时还要维护数据的一致性 。
@threadsafe public class concurrentstack <e> { atomicreference<node<e>> top = new atomicreference<>(); private static class node <e> { public final e item; public node<e> next; public node(e item) { this.item = item; } } public void push(e item) { node<e> newhead = new node<>(item); node<e> oldhead; do { oldhead = top.get(); newhead.next = oldhead; } while (!top.compareandset(oldhead, newhead)); } public e pop() { node<e> oldhead; node<e> newhead; do { oldhead = top.get(); if (oldhead == null) return null; newhead = oldhead.next; } while (!top.compareandset(oldhead, newhead)); return oldhead.item; } }
总结
- 总体来讲源码比较简单,因为其本身只是一个 future 模式的实现
- 但是其中的状态量的设置,还有里面很多无锁的处理方式,才是 futuretask 带给我们的精华!
下一篇: 不能再错失机会:诺基亚发力可穿戴与VR