1 简介
futuretask同时实现了future 、runnable接口,因此它可以交给执行器executor去执行这个任务,也可以由调用线程直接执行run方法。
①未启动: run方法还未被执行,futuretask处于未启动状态。
②已启动: run方法在执行过程中,futuretask处于已启动状态
当futuretask处于未启动状态时,执行futuretask.cancel()方法将导致此任务永远不会被执行;当futuretask处于已启动状态时,执行futuretask.cancel(true)方法将以中断执行此任务线程的方式来试图停止任务;当futuretask处于已启动状态时,执行 futuretask.cancel(false)方法将不会对正在执行此任务的线程产生影响(让正在执行的任务运行完成);当futuretask处于已完成状态时,执行futuretask.cancel方法将返回false (已完成的任务任务无法取消)。
2 用法示例
futuretask因其自身继承于runnable接口,因此它可以交给执行器executor去执行;另外它也代表异步任务结果,它还可以通过executorservice.submit返回一个futuretask。另外futuretask也可单独使用。为了更好的理解futuretask ,下面结合concurrenthashmap演示一个任务缓存。缓存中有多个任务,使用多线程去执行这些任务,一个任务最多被一个线程消费,若多个线程试图执行这一个任务,只允许一个线程来执行,其他线程必须等待它执行完成。
import java.util.concurrent.*; public class futuretasktest { private final concurrentmap<string, future<string>> taskcache = new concurrenthashmap<>(); public string executiontask(final string taskname) throws executionexception, interruptedexception { while (true) { future<string> future = taskcache.get(taskname);// 从缓存中获取任务 if (future == null) {//不存在此任务,新构建一个任务放入缓存,并启动这个任务 callable<string> task = () ->{ system.out.println("执行的任务名是"+taskname); return taskname; } ; // 1.2创建任务 futuretask<string> futuretask = new futuretask<string>(task); future = taskcache.putifabsent(taskname, futuretask);// 尝试将任务放入缓存中 if (future == null) { future = futuretask; futuretask.run();//执行任务 } } try { //若任务在缓存中了,可以直接等待任务的完成 return future.get();// 等待任务执行完成 } catch (cancellationexception e) { taskcache.remove(taskname, future); } } } public static void main(string[] args) { final futuretasktest tasktest = new futuretasktest(); for (int i = 0; i < 7; i++) { int finali = i; new thread(()->{ try { tasktest.executiontask("taskname" + finali); } catch (executionexception | interruptedexception e) { e.printstacktrace(); } }).start(); new thread(()->{ try { tasktest.executiontask("taskname" + finali); tasktest.executiontask("taskname" + finali); } catch (executionexception | interruptedexception e) { e.printstacktrace(); } }).start(); } } }
3 实现原理
1) 成员变量
private volatile int state;
private static final int new = 0;//刚开始的状态或任务在运行中 private static final int completing = 1;//临时状态,任务即将结束,正在设置结果 private static final int normal = 2;//任务正常完成 private static final int exceptional = 3;//因抛出异常而结束任务 private static final int cancelled = 4;//任务被取消 private static final int interrupting = 5;//任务正在被中断 private static final int interrupted = 6;//任务被中断(中断的最终状态)
/** new -> completing -> normal 正常结束任务时的状态转换流程 * new -> completing -> exceptional 任务执行过程中抛出了异常时的状态转换流程 * new -> cancelled 任务被取消时的状态转换流程 * new -> interrupting -> interrupted 任务执行过程中出现中断时的状态转换流程 */
private callable<v> callable; private object outcome; // non-volatile, protected by state reads/writes private volatile thread runner; private volatile waitnode waiters;
成员变量waiter表示等待任务执行结果的等待栈(数据结构是单向链表) 。waitnode是一个简单的静态内部,一个成员变量thread表示等待结果的线程,另一个成员变量next表示下一个等待节点(线程)。
static final class waitnode { volatile thread thread; volatile waitnode next; waitnode() { thread = thread.currentthread(); } }
2) 构造方法
futuretask的构造方法会初始化callable和state ,它有两个构造方法, 分别接受callable和runnable类型的待执行任务。但对于runnable类型参数,它会调用executors.callable将runnable转换为callable类型实例,以便于统一处理。
public futuretask(callable<v> callable) { if (callable == null) throw new nullpointerexception(); this.callable = callable; this.state = new; // ensure visibility of callable } public futuretask(runnable runnable, v result) { this.callable = executors.callable(runnable, result); this.state = new; // ensure visibility of callable }
public static <t> callable<t> callable(runnable task, t result) { if (task == null) throw new nullpointerexception(); return new runnableadapter<t>(task, result); } static final class runnableadapter<t> implements callable<t> { final runnable task; final t result; runnableadapter(runnable task, t result) { this.task = task; this.result = result; } public t call() { task.run(); return result; } }
3) 主要api
(1) run与runandreset
public void run() { if (state != new || !unsafe.compareandswapobject(this, runneroffset, null, thread.currentthread())) //将当前线程设置为执行任务的线程,cas失败就直接返回 return; try { callable<v> c = callable; if (c != null && state == new) { v result; boolean ran; try { result = c.call();//执行任务 ran = true; } catch (throwable ex) { //运行时有异常,设置异常 result = null; ran = false; setexception(ex); } if (ran) set(result);//设置结果 } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; //state已是最终状态,不再变化,将runer设为null,防止run方法被并发调用 // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; //清空运行线程runner后再重新获取state,防止遗漏掉对中断的处理 if (s >= interrupting) handlepossiblecancellationinterrupt(s); } }
③如果任务正常完成,调用set设置任务的结果,将state设为normal, 将结果保存到outcome ,唤醒所有等待结果的线程
④若执行任务过程中发生了异常,调用setexception设置异常,将state设为exceptional ,将此异常也保存到outcome ,唤醒所有等待结果的线程
set 、setexception方法分别用来设置结果、设置异常,但这仅是它们的主要逻辑,它们还会进行其他的处理。
protected void setexception(throwable t) { if (unsafe.compareandswapint(this, stateoffset, new, completing)) { outcome = t; unsafe.putorderedint(this, stateoffset, exceptional); // final state finishcompletion(); } } protected void set(v v) { if (unsafe.compareandswapint(this, stateoffset, new, completing)) { outcome = v; unsafe.putorderedint(this, stateoffset, normal); // final state finishcompletion(); } }
private void handlepossiblecancellationinterrupt(int s) { if (s == interrupting) while (state == interrupting) thread.yield(); // wait out pending interrupt }
runandreset方法是futuretask类自己添加的protected级别的方法(供子类调用), 这个方法主要用来执行可多次执行且不需要结果的任务,只有在任务运行和重设成功时才返回true 。定时任务执行器scheduledthreadpoolexecutor的静态内部scheduledfuturetask的run方法调用了这个api.
protected boolean runandreset() { if (state != new || !unsafe.compareandswapobject(this, runneroffset, null, thread.currentthread())) //任务已启动或cas设置运行任务的的线程失败,直接返回false return false; boolean ran = false; int s = state; try { callable<v> c = callable; if (c != null && s == new) { try { c.call(); // don't set result 没有调用set(v)方法,不设置结束 ran = true; } catch (throwable ex) { setexception(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= interrupting) handlepossiblecancellationinterrupt(s); } return ran && s == new; //任务成功运行且state还是new时返回true,反之返回false }
(2) get方法
get方法用于获取任务的最终结果,它有两个版本,其中一个是超时版本。两个版本的最主要的区别在于,非超时版本可以不限时长地等待结果返回 ,另外非超时版本不会抛出timeoutexception超时异常。get方法超时版本的基本逻辑:若任务未完成就等待任务完成,最后调用report报告结果,report会根据状态返回结果或抛出异常。
public v get() throws interruptedexception, executionexception { int s = state; if (s <= completing) s = awaitdone(false, 0l);//awaitdone第一个参数为false,表示可以无限时长等待 return report(s); } public v get(long timeout, timeunit unit) throws interruptedexception, executionexception, timeoutexception { if (unit == null) throw new nullpointerexception(); int s = state; if (s <= completing &&//还未完成 (s = awaitdone(true, unit.tonanos(timeout))) <= completing)//等待完成 throw new timeoutexception();//到了限定时间,任务仍未完成,抛出超时异常timeoutexception return report(s);//报告结果 }
private int awaitdone(boolean timed, long nanos) throws interruptedexception { final long deadline = timed ? system.nanotime() + nanos : 0l; waitnode q = null; boolean queued = false; for (;;) { if (thread.interrupted()) { //被中断了,就在等待栈表中移除这个线程,并抛出中断异常 removewaiter(q); throw new interruptedexception(); } int s = state; if (s > completing) { //任务完成了,将当前线程从等待队列中清空,返回最新的状态 if (q != null) q.thread = null; return s; } //任务即将完成,当前线程礼让,让其他线程执行 else if (s == completing) // cannot time out yet thread.yield(); else if (q == null) //初始化当前线程对应的节点 q = new waitnode(); else if (!queued) //如果之前入栈失败,再次尝试入栈(cas更新),将当前节点设为等待栈的栈顶 queued = unsafe.compareandswapobject(this, waitersoffset, q.next = waiters, q); else if (timed) { //如果设置了超时时间 nanos = deadline - system.nanotime(); if (nanos <= 0l) { //如果任务执行时长已经超出了给定的时间,从等待栈中移除当前节点(线程) removewaiter(q); return state; } //让当前线程休眠等待给定的时间(或等到run方法中的set或setexception调用finishcompletion来唤醒) locksupport.parknanos(this, nanos); } else//未设置超时时间 //让当前线程无限时长休眠等待,直到任务完成时run方法中的set或setexception调用finishcompletion来唤醒此线程 locksupport.park(this); } }
private void removewaiter(waitnode node) { if (node != null) { node.thread = null; //先将节点对应的线程清空,下面的"q.thread != null"就能判断节点是否超时或中断节点。 retry: for (;;) { // restart on removewaiter race //q表示当前遍历到的节点,pred表示q的前驱节点,s表示q的后继节点 for (waitnode pred = null, q = waiters, s; q != null; q = s) {//遍历完链表才能退出内循环 s = q.next; //q.thread!=null 表示这不是超时或中断的节点,它是效节点,不能被从栈表中移除 //(removewaiter的开头将超时或中断的节点在thread赋空,可见node.thread=null代码) if (q.thread != null) pred = q; //得到下次循环时q的前驱节点 else if (pred != null) { //q.thread== null 且pred!=null,需要将无效节点q从栈表中移除 //将q的前驱、后继节点直接链接在一起,q本身被移除出栈表了 pred.next = s; //这里是从前向后遍历链表的,无竞争情况下,不可能没检查到当前节点的前面还有无效节点, //那么一定有其他线程修改了当前节点q的前驱,些时有线程竞争,需要从链表的头部重新遍历检查 if (pred.thread == null) // check for race continue retry; } // pred==null且q.thread=null //q的前驱节点为空,表明q是链表的头节点 //q.thread==null,表明q是无效节点 //无效节点不能作为链表的头节点,所以要更新头节点,将q的后继节点s作为链表新的头节点 else if (!unsafe.compareandswapobject(this, waitersoffset, //cas更新头节点 q, s)) //cas更新失败,重试 continue retry; } break; } } }
private v report(int s) throws executionexception { object x = outcome; if (s == normal) return (v)x; if (s >= cancelled) throw new cancellationexception(); throw new executionexception((throwable)x); }
(3) cancel方法
public boolean cancel(boolean mayinterruptifrunning) { if (!(state == new && unsafe.compareandswapint(this, stateoffset, new, mayinterruptifrunning ? interrupting : cancelled))) //①不是new状态,表示任务至少是completing(即将结束)状态,返回false //②cas更新state为interrupting或cancelled失败,返回false //只有state状态更新成功,才能取消任务(防止被并发调用) return false; try { // in case call to interrupt throws exception if (mayinterruptifrunning) {//允许中断就设置中断标志 try { thread t = runner; if (t != null) t.interrupt();//设置中断标志 } finally { // final state 设置中断的最终状态 //interrupting -> interrupted ,将state由“正在中断”更新为”已经中断“ unsafe.putorderedint(this, stateoffset, interrupted); } } } finally { //从等待栈上唤醒并移除所有的线程(节点) finishcompletion(); } return true; }
private void finishcompletion() { // assert state > completing; for (waitnode q; (q = waiters) != null;) { //任务取消后,等待栈表没有存在的意义了,将等待栈waiters赋为null, if (unsafe.compareandswapobject(this, waitersoffset, q, null)) { for (;;) { //遍历链表(栈),将所有节点移除,并唤醒节点对应的线程 thread t = q.thread; if (t != null) { //将节点上的线程清空 q.thread = null; locksupport.unpark(t);//唤醒此线程 } waitnode next = q.next; if (next == null)//链表到尾了,退出遍历 break; q.next = null; // unlink to help gc 将节点next属性清空,方便垃圾回收 q = next;//向后移动一个节点 } break; } } done();//空方法,留给子类重写 callable = null; //赋空,减少痕迹 // to reduce footprint }
(4) 其他辅助方法
iscancelled 、isdone都是直接根据state确定任务的状态。
public boolean iscancelled() { return state >= cancelled; } public boolean isdone() { return state != new; }
