futuretask源码分析(推荐)
程序员文章站
2024-02-25 17:05:45
futuretask只实现runnablefuture接口:
该接口继承了java.lang.runnable和future接口,也就是继承了这两个接口的特性。...
futuretask只实现runnablefuture接口:
该接口继承了java.lang.runnable和future接口,也就是继承了这两个接口的特性。
1.可以不必直接继承thread来生成子类,只要实现run方法,且把实例传入到thread构造函数,thread就可以执行该实例的run方法了( thread(runnable) )。
2.可以让任务独立执行,get获取任务执行结果时,可以阻塞直至执行结果完成。也可以中断执行,判断执行状态等。
futuretask是一个支持取消行为的异步任务执行器。该类实现了future接口的方法。
如: 1. 取消任务执行
2. 查询任务是否执行完成
3. 获取任务执行结果(”get“任务必须得执行完成才能获取结果,否则会阻塞直至任务完成)。
注意:一旦任务执行完成,则不能执行取消任务或者重新启动任务。(除非一开始就使用runandreset模式运行任务)
futuretask支持执行两种任务, callable 或者 runnable的实现类。且可把futuretask实例交由executor执行。
源码部分(很简单):
public class futuretask<v> implements runnablefuture<v> { /* * revision notes: this differs from previous versions of this * class that relied on abstractqueuedsynchronizer, mainly to * avoid surprising users about retaining interrupt status during * cancellation races. sync control in the current design relies * on a "state" field updated via cas to track completion, along * with a simple treiber stack to hold waiting threads. * * style note: as usual, we bypass overhead of using * atomicxfieldupdaters and instead directly use unsafe intrinsics. */ /** * the run state of this task, initially new. the run state * transitions to a terminal state only in methods set, * setexception, and cancel. during completion, state may take on * transient values of completing (while outcome is being set) or * interrupting (only while interrupting the runner to satisfy a * cancel(true)). transitions from these intermediate to final * states use cheaper ordered/lazy writes because values are unique * and cannot be further modified. * * possible state transitions: * new -> completing -> normal * new -> completing -> exceptional * new -> cancelled * new -> interrupting -> interrupted */ private volatile int state; private static final int new = 0; private static final int completing = 1; private static final int normal = 2; private static final int exceptional = 3; private static final int cancelled = 4; private static final int interrupting = 5; private static final int interrupted = 6; /** the underlying callable; nulled out after running */ private callable<v> callable; /** 用来存储任务执行结果或者异常对象,根据任务state在get时候选择返回执行结果还是抛出异常 */ private object outcome; // non-volatile, protected by state reads/writes /** 当前运行run方法的线程 */ private volatile thread runner; /** treiber stack of waiting threads */ private volatile waitnode waiters; /** * returns result or throws exception for completed task. * * @param s completed state value */ @suppresswarnings("unchecked") private v report(int s) throws executionexception { object x = outcome; if (s == normal) return (v)x; if (s >= cancelled) throw new cancellationexception(); throw new executionexception((throwable)x); } /** * creates a {@code futuretask} that will, upon running, execute the * given {@code callable}. * * @param callable the callable task * @throws nullpointerexception if the callable is null */ public futuretask(callable<v> callable) { if (callable == null) throw new nullpointerexception(); this.callable = callable; this.state = new; // ensure visibility of callable } /** * creates a {@code futuretask} that will, upon running, execute the * given {@code runnable}, and arrange that {@code get} will return the * given result on successful completion. * * @param runnable the runnable task * @param result the result to return on successful completion. if * you don't need a particular result, consider using * constructions of the form: * {@code future<?> f = new futuretask<void>(runnable, null)} * @throws nullpointerexception if the runnable is null */ public futuretask(runnable runnable, v result) { this.callable = executors.callable(runnable, result); this.state = new; // ensure visibility of callable } //判断任务是否已取消(异常中断、取消等) public boolean iscancelled() { return state >= cancelled; } /** 判断任务是否已结束(取消、异常、完成、normal都等于结束) ** public boolean isdone() { return state != new; } /** mayinterruptifrunning用来决定任务的状态。 true : 任务状态= interrupting = 5。如果任务已经运行,则强行中断。如果任务未运行,那么则不会再运行 false:cancelled = 4。如果任务已经运行,则允许运行完成(但不能通过get获取结果)。如果任务未运行,那么则不会再运行 **/ public boolean cancel(boolean mayinterruptifrunning) { if (state != new) return false; if (mayinterruptifrunning) { if (!unsafe.compareandswapint(this, stateoffset, new, interrupting)) return false; thread t = runner; if (t != null) t.interrupt(); unsafe.putorderedint(this, stateoffset, interrupted); // final state } else if (!unsafe.compareandswapint(this, stateoffset, new, cancelled)) return false; finishcompletion(); return true; } /** * @throws cancellationexception {@inheritdoc} */ public v get() throws interruptedexception, executionexception { int s = state; //如果任务未彻底完成,那么则阻塞直至任务完成后唤醒该线程 if (s <= completing) s = awaitdone(false, 0l); return report(s); } /** * @throws cancellationexception {@inheritdoc} */ public v get(long timeout, timeunit unit) throws interruptedexception, executionexception, timeoutexception { if (unit == null) throw new nullpointerexception(); int s = state; if (s <= completing && (s = awaitdone(true, unit.tonanos(timeout))) <= completing) throw new timeoutexception(); return report(s); } /** * protected method invoked when this task transitions to state * {@code isdone} (whether normally or via cancellation). the * default implementation does nothing. subclasses may override * this method to invoke completion callbacks or perform * bookkeeping. note that you can query status inside the * implementation of this method to determine whether this task * has been cancelled. */ protected void done() { } /** 该方法在futuretask里只有run方法在任务完成后调用。 主要保存任务执行结果到成员变量outcome 中,和切换任务执行状态。 由该方法可以得知: completing : 任务已执行完成(也可能是异常完成),但还未设置结果到成员变量outcome中,也意味着还不能get normal : 任务彻底执行完成 **/ protected void set(v v) { if (unsafe.compareandswapint(this, stateoffset, new, completing)) { outcome = v; unsafe.putorderedint(this, stateoffset, normal); // final state finishcompletion(); } } /** * causes this future to report an {@link executionexception} * with the given throwable as its cause, unless this future has * already been set or has been cancelled. * * <p>this method is invoked internally by the {@link #run} method * upon failure of the computation. * * @param t the cause of failure */ protected void setexception(throwable t) { if (unsafe.compareandswapint(this, stateoffset, new, completing)) { outcome = t; unsafe.putorderedint(this, stateoffset, exceptional); // final state finishcompletion(); } } /** 由于实现了runnable接口的缘故,该方法可由执行线程所调用。 **/ public void run() { //只有当任务状态=new时才被运行继续执行 if (state != new || !unsafe.compareandswapobject(this, runneroffset, null, thread.currentthread())) return; try { callable<v> c = callable; if (c != null && state == new) { v result; boolean ran; try { //调用callable的call方法 result = c.call(); ran = true; } catch (throwable ex) { result = null; ran = false; setexception(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= interrupting) handlepossiblecancellationinterrupt(s); } } /** 如果该任务在执行过程中不被取消或者异常结束,那么该方法不记录任务的执行结果,且不修改任务执行状态。 所以该方法可以重复执行n次。不过不能直接调用,因为是protected权限。 **/ protected boolean runandreset() { if (state != new || !unsafe.compareandswapobject(this, runneroffset, null, thread.currentthread())) return false; boolean ran = false; int s = state; try { callable<v> c = callable; if (c != null && s == new) { try { c.call(); // don't set result ran = true; } catch (throwable ex) { setexception(ex); } } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts s = state; if (s >= interrupting) handlepossiblecancellationinterrupt(s); } return ran && s == new; } /** * ensures that any interrupt from a possible cancel(true) is only * delivered to a task while in run or runandreset. */ private void handlepossiblecancellationinterrupt(int s) { // it is possible for our interrupter to stall before getting a // chance to interrupt us. let's spin-wait patiently. if (s == interrupting) while (state == interrupting) thread.yield(); // wait out pending interrupt // assert state == interrupted; // we want to clear any interrupt we may have received from // cancel(true). however, it is permissible to use interrupts // as an independent mechanism for a task to communicate with // its caller, and there is no way to clear only the // cancellation interrupt. // // thread.interrupted(); } /** * simple linked list nodes to record waiting threads in a treiber * stack. see other classes such as phaser and synchronousqueue * for more detailed explanation. */ static final class waitnode { volatile thread thread; volatile waitnode next; waitnode() { thread = thread.currentthread(); } } /** 该方法在任务完成(包括异常完成、取消)后调用。删除所有正在get获取等待的节点且唤醒节点的线程。和调用done方法和置空callable. **/ private void finishcompletion() { // assert state > completing; for (waitnode q; (q = waiters) != null;) { if (unsafe.compareandswapobject(this, waitersoffset, q, null)) { for (;;) { thread t = q.thread; if (t != null) { q.thread = null; locksupport.unpark(t); } waitnode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint } /** 阻塞等待任务执行完成(中断、正常完成、超时) **/ private int awaitdone(boolean timed, long nanos) throws interruptedexception { final long deadline = timed ? system.nanotime() + nanos : 0l; waitnode q = null; boolean queued = false; for (;;) { /** 这里的if else的顺序也是有讲究的。 1.先判断线程是否中断,中断则从队列中移除(也可能该线程不存在于队列中) 2.判断当前任务是否执行完成,执行完成则不再阻塞,直接返回。 3.如果任务状态=completing,证明该任务处于已执行完成,正在切换任务执行状态,cpu让出片刻即可 4.q==null,则证明还未创建节点,则创建节点 5.q节点入队 6和7.阻塞 **/ if (thread.interrupted()) { removewaiter(q); throw new interruptedexception(); } int s = state; if (s > completing) { if (q != null) q.thread = null; return s; } else if (s == completing) // cannot time out yet thread.yield(); else if (q == null) q = new waitnode(); else if (!queued) queued = unsafe.compareandswapobject(this, waitersoffset, q.next = waiters, q); else if (timed) { nanos = deadline - system.nanotime(); if (nanos <= 0l) { removewaiter(q); return state; } locksupport.parknanos(this, nanos); } else locksupport.park(this); } } /** * tries to unlink a timed-out or interrupted wait node to avoid * accumulating garbage. internal nodes are simply unspliced * without cas since it is harmless if they are traversed anyway * by releasers. to avoid effects of unsplicing from already * removed nodes, the list is retraversed in case of an apparent * race. this is slow when there are a lot of nodes, but we don't * expect lists to be long enough to outweigh higher-overhead * schemes. */ private void removewaiter(waitnode node) { if (node != null) { node.thread = null; retry: for (;;) { // restart on removewaiter race for (waitnode pred = null, q = waiters, s; q != null; q = s) { s = q.next; if (q.thread != null) pred = q; else if (pred != null) { pred.next = s; if (pred.thread == null) // check for race continue retry; } else if (!unsafe.compareandswapobject(this, waitersoffset, q, s)) continue retry; } break; } } } // unsafe mechanics private static final sun.misc.unsafe unsafe; private static final long stateoffset; private static final long runneroffset; private static final long waitersoffset; static { try { unsafe = sun.misc.unsafe.getunsafe(); class<?> k = futuretask.class; stateoffset = unsafe.objectfieldoffset (k.getdeclaredfield("state")); runneroffset = unsafe.objectfieldoffset (k.getdeclaredfield("runner")); waitersoffset = unsafe.objectfieldoffset (k.getdeclaredfield("waiters")); } catch (exception e) { throw new error(e); } } }
总结
以上就是本文关于futuretask源码分析(推荐)的全部内容,希望对大家有所帮助。感兴趣的朋友可以参阅:java利用future及时获取多线程运行结果、浅谈java多线程处理中future的妙用(附源码)、等,有什么问题可以随时留言,欢迎大家一起交流讨论。