欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

futuretask源码分析(推荐)

程序员文章站 2024-04-01 19:30:52
futuretask只实现runnablefuture接口: 该接口继承了java.lang.runnable和future接口,也就是继承了这两个接口的特性。...

futuretask只实现runnablefuture接口:

该接口继承了java.lang.runnable和future接口,也就是继承了这两个接口的特性。

1.可以不必直接继承thread来生成子类,只要实现run方法,且把实例传入到thread构造函数,thread就可以执行该实例的run方法了( thread(runnable) )。

2.可以让任务独立执行,get获取任务执行结果时,可以阻塞直至执行结果完成。也可以中断执行,判断执行状态等。

futuretask是一个支持取消行为的异步任务执行器。该类实现了future接口的方法。

如: 1. 取消任务执行

2. 查询任务是否执行完成

3. 获取任务执行结果(”get“任务必须得执行完成才能获取结果,否则会阻塞直至任务完成)。

注意:一旦任务执行完成,则不能执行取消任务或者重新启动任务。(除非一开始就使用runandreset模式运行任务)
futuretask支持执行两种任务, callable 或者 runnable的实现类。且可把futuretask实例交由executor执行。

源码部分(很简单):

public class futuretask<v> implements runnablefuture<v> {
  /*
   * revision notes: this differs from previous versions of this
   * class that relied on abstractqueuedsynchronizer, mainly to
   * avoid surprising users about retaining interrupt status during
   * cancellation races. sync control in the current design relies
   * on a "state" field updated via cas to track completion, along
   * with a simple treiber stack to hold waiting threads.
   *
   * style note: as usual, we bypass overhead of using
   * atomicxfieldupdaters and instead directly use unsafe intrinsics.
   */
  /**
   * the run state of this task, initially new. the run state
   * transitions to a terminal state only in methods set,
   * setexception, and cancel. during completion, state may take on
   * transient values of completing (while outcome is being set) or
   * interrupting (only while interrupting the runner to satisfy a
   * cancel(true)). transitions from these intermediate to final
   * states use cheaper ordered/lazy writes because values are unique
   * and cannot be further modified.
   *
   * possible state transitions:
   * new -> completing -> normal
   * new -> completing -> exceptional
   * new -> cancelled
   * new -> interrupting -> interrupted
   */
  private volatile int state;
  private static final int new     = 0;
  private static final int completing  = 1;
  private static final int normal    = 2;
  private static final int exceptional = 3;
  private static final int cancelled  = 4;
  private static final int interrupting = 5;
  private static final int interrupted = 6;
  /** the underlying callable; nulled out after running */
  private callable<v> callable;
  /** 用来存储任务执行结果或者异常对象,根据任务state在get时候选择返回执行结果还是抛出异常 */
  private object outcome; // non-volatile, protected by state reads/writes
  /** 当前运行run方法的线程 */
  private volatile thread runner;
  /** treiber stack of waiting threads */
  private volatile waitnode waiters;
  /**
   * returns result or throws exception for completed task.
   *
   * @param s completed state value
   */
  @suppresswarnings("unchecked")
  private v report(int s) throws executionexception {
    object x = outcome;
    if (s == normal)
      return (v)x;
    if (s >= cancelled)
      throw new cancellationexception();
    throw new executionexception((throwable)x);
  }
  /**
   * creates a {@code futuretask} that will, upon running, execute the
   * given {@code callable}.
   *
   * @param callable the callable task
   * @throws nullpointerexception if the callable is null
   */
  public futuretask(callable<v> callable) {
    if (callable == null)
      throw new nullpointerexception();
    this.callable = callable;
    this.state = new;    // ensure visibility of callable
  }
  /**
   * creates a {@code futuretask} that will, upon running, execute the
   * given {@code runnable}, and arrange that {@code get} will return the
   * given result on successful completion.
   *
   * @param runnable the runnable task
   * @param result the result to return on successful completion. if
   * you don't need a particular result, consider using
   * constructions of the form:
   * {@code future<?> f = new futuretask<void>(runnable, null)}
   * @throws nullpointerexception if the runnable is null
   */
  public futuretask(runnable runnable, v result) {
    this.callable = executors.callable(runnable, result);
    this.state = new;    // ensure visibility of callable
  }
  //判断任务是否已取消(异常中断、取消等)
  public boolean iscancelled() {
    return state >= cancelled;
  }
  /**
  判断任务是否已结束(取消、异常、完成、normal都等于结束)
  **
  public boolean isdone() {
    return state != new;
  }
  /**
  mayinterruptifrunning用来决定任务的状态。
          true : 任务状态= interrupting = 5。如果任务已经运行,则强行中断。如果任务未运行,那么则不会再运行
          false:cancelled  = 4。如果任务已经运行,则允许运行完成(但不能通过get获取结果)。如果任务未运行,那么则不会再运行
  **/
  public boolean cancel(boolean mayinterruptifrunning) {
    if (state != new)
      return false;
    if (mayinterruptifrunning) {
      if (!unsafe.compareandswapint(this, stateoffset, new, interrupting))
        return false;
      thread t = runner;
      if (t != null)
        t.interrupt();
      unsafe.putorderedint(this, stateoffset, interrupted); // final state
    }
    else if (!unsafe.compareandswapint(this, stateoffset, new, cancelled))
      return false;
    finishcompletion();
    return true;
  }
  /**
   * @throws cancellationexception {@inheritdoc}
   */
  public v get() throws interruptedexception, executionexception {
    int s = state;
    //如果任务未彻底完成,那么则阻塞直至任务完成后唤醒该线程
    if (s <= completing)
      s = awaitdone(false, 0l);
    return report(s);
  }
  /**
   * @throws cancellationexception {@inheritdoc}
   */
  public v get(long timeout, timeunit unit)
    throws interruptedexception, executionexception, timeoutexception {
    if (unit == null)
      throw new nullpointerexception();
    int s = state;
    if (s <= completing &&
      (s = awaitdone(true, unit.tonanos(timeout))) <= completing)
      throw new timeoutexception();
    return report(s);
  }
  /**
   * protected method invoked when this task transitions to state
   * {@code isdone} (whether normally or via cancellation). the
   * default implementation does nothing. subclasses may override
   * this method to invoke completion callbacks or perform
   * bookkeeping. note that you can query status inside the
   * implementation of this method to determine whether this task
   * has been cancelled.
   */
  protected void done() { }
  /**
  该方法在futuretask里只有run方法在任务完成后调用。
  主要保存任务执行结果到成员变量outcome 中,和切换任务执行状态。
  由该方法可以得知:
  completing : 任务已执行完成(也可能是异常完成),但还未设置结果到成员变量outcome中,也意味着还不能get
  normal  : 任务彻底执行完成
  **/
  protected void set(v v) {
    if (unsafe.compareandswapint(this, stateoffset, new, completing)) {
      outcome = v;
      unsafe.putorderedint(this, stateoffset, normal); // final state
      finishcompletion();
    }
  }
  /**
   * causes this future to report an {@link executionexception}
   * with the given throwable as its cause, unless this future has
   * already been set or has been cancelled.
   *
   * <p>this method is invoked internally by the {@link #run} method
   * upon failure of the computation.
   *
   * @param t the cause of failure
   */
  protected void setexception(throwable t) {
    if (unsafe.compareandswapint(this, stateoffset, new, completing)) {
      outcome = t;
      unsafe.putorderedint(this, stateoffset, exceptional); // final state
      finishcompletion();
    }
  }
  /**
  由于实现了runnable接口的缘故,该方法可由执行线程所调用。
  **/
  public void run() {
    //只有当任务状态=new时才被运行继续执行
    if (state != new ||
      !unsafe.compareandswapobject(this, runneroffset,
                     null, thread.currentthread()))
      return;
    try {
      callable<v> c = callable;
      if (c != null && state == new) {
        v result;
        boolean ran;
        try {
          //调用callable的call方法
          result = c.call();
          ran = true;
        } catch (throwable ex) {
          result = null;
          ran = false;
          setexception(ex);
        }
        if (ran)
          set(result);
      }
    } finally {
      // runner must be non-null until state is settled to
      // prevent concurrent calls to run()
      runner = null;
      // state must be re-read after nulling runner to prevent
      // leaked interrupts
      int s = state;
      if (s >= interrupting)
        handlepossiblecancellationinterrupt(s);
    }
  }
  /**
  如果该任务在执行过程中不被取消或者异常结束,那么该方法不记录任务的执行结果,且不修改任务执行状态。
  所以该方法可以重复执行n次。不过不能直接调用,因为是protected权限。
  **/
  protected boolean runandreset() {
    if (state != new ||
      !unsafe.compareandswapobject(this, runneroffset,
                     null, thread.currentthread()))
      return false;
    boolean ran = false;
    int s = state;
    try {
      callable<v> c = callable;
      if (c != null && s == new) {
        try {
          c.call(); // don't set result
          ran = true;
        } catch (throwable ex) {
          setexception(ex);
        }
      }
    } finally {
      // runner must be non-null until state is settled to
      // prevent concurrent calls to run()
      runner = null;
      // state must be re-read after nulling runner to prevent
      // leaked interrupts
      s = state;
      if (s >= interrupting)
        handlepossiblecancellationinterrupt(s);
    }
    return ran && s == new;
  }
  /**
   * ensures that any interrupt from a possible cancel(true) is only
   * delivered to a task while in run or runandreset.
   */
  private void handlepossiblecancellationinterrupt(int s) {
    // it is possible for our interrupter to stall before getting a
    // chance to interrupt us. let's spin-wait patiently.
    if (s == interrupting)
      while (state == interrupting)
        thread.yield(); // wait out pending interrupt
    // assert state == interrupted;
    // we want to clear any interrupt we may have received from
    // cancel(true). however, it is permissible to use interrupts
    // as an independent mechanism for a task to communicate with
    // its caller, and there is no way to clear only the
    // cancellation interrupt.
    //
    // thread.interrupted();
  }
  /**
   * simple linked list nodes to record waiting threads in a treiber
   * stack. see other classes such as phaser and synchronousqueue
   * for more detailed explanation.
   */
  static final class waitnode {
    volatile thread thread;
    volatile waitnode next;
    waitnode() { thread = thread.currentthread(); }
  }
  /**
  该方法在任务完成(包括异常完成、取消)后调用。删除所有正在get获取等待的节点且唤醒节点的线程。和调用done方法和置空callable.
  **/
  private void finishcompletion() {
    // assert state > completing;
    for (waitnode q; (q = waiters) != null;) {
      if (unsafe.compareandswapobject(this, waitersoffset, q, null)) {
        for (;;) {
          thread t = q.thread;
          if (t != null) {
            q.thread = null;
            locksupport.unpark(t);
          }
          waitnode next = q.next;
          if (next == null)
            break;
          q.next = null; // unlink to help gc
          q = next;
        }
        break;
      }
    }
    done();
    callable = null;    // to reduce footprint
  }
  /**
  阻塞等待任务执行完成(中断、正常完成、超时)
  **/
  private int awaitdone(boolean timed, long nanos)
    throws interruptedexception {
    final long deadline = timed ? system.nanotime() + nanos : 0l;
    waitnode q = null;
    boolean queued = false;
    for (;;) {
      /**
      这里的if else的顺序也是有讲究的。
      1.先判断线程是否中断,中断则从队列中移除(也可能该线程不存在于队列中)
      2.判断当前任务是否执行完成,执行完成则不再阻塞,直接返回。
      3.如果任务状态=completing,证明该任务处于已执行完成,正在切换任务执行状态,cpu让出片刻即可
      4.q==null,则证明还未创建节点,则创建节点
      5.q节点入队
      6和7.阻塞
      **/
      if (thread.interrupted()) {
        removewaiter(q);
        throw new interruptedexception();
      }
      int s = state;
      if (s > completing) {
        if (q != null)
          q.thread = null;
        return s;
      }
      else if (s == completing) // cannot time out yet
        thread.yield();
      else if (q == null)
        q = new waitnode();
      else if (!queued)
        queued = unsafe.compareandswapobject(this, waitersoffset,
                           q.next = waiters, q);
      else if (timed) {
        nanos = deadline - system.nanotime();
        if (nanos <= 0l) {
          removewaiter(q);
          return state;
        }
        locksupport.parknanos(this, nanos);
      }
      else
        locksupport.park(this);
    }
  }
  /**
   * tries to unlink a timed-out or interrupted wait node to avoid
   * accumulating garbage. internal nodes are simply unspliced
   * without cas since it is harmless if they are traversed anyway
   * by releasers. to avoid effects of unsplicing from already
   * removed nodes, the list is retraversed in case of an apparent
   * race. this is slow when there are a lot of nodes, but we don't
   * expect lists to be long enough to outweigh higher-overhead
   * schemes.
   */
  private void removewaiter(waitnode node) {
    if (node != null) {
      node.thread = null;
      retry:
      for (;;) {     // restart on removewaiter race
        for (waitnode pred = null, q = waiters, s; q != null; q = s) {
          s = q.next;
          if (q.thread != null)
            pred = q;
          else if (pred != null) {
            pred.next = s;
            if (pred.thread == null) // check for race
              continue retry;
          }
          else if (!unsafe.compareandswapobject(this, waitersoffset,
                             q, s))
            continue retry;
        }
        break;
      }
    }
  }
  // unsafe mechanics
  private static final sun.misc.unsafe unsafe;
  private static final long stateoffset;
  private static final long runneroffset;
  private static final long waitersoffset;
  static {
    try {
      unsafe = sun.misc.unsafe.getunsafe();
      class<?> k = futuretask.class;
      stateoffset = unsafe.objectfieldoffset
        (k.getdeclaredfield("state"));
      runneroffset = unsafe.objectfieldoffset
        (k.getdeclaredfield("runner"));
      waitersoffset = unsafe.objectfieldoffset
        (k.getdeclaredfield("waiters"));
    } catch (exception e) {
      throw new error(e);
    }
  }
}

总结

以上就是本文关于futuretask源码分析(推荐)的全部内容,希望对大家有所帮助。感兴趣的朋友可以参阅:java利用future及时获取多线程运行结果浅谈java多线程处理中future的妙用(附源码)、等,有什么问题可以随时留言,欢迎大家一起交流讨论。