欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

玩转FutrueTask源码

程序员文章站 2022-05-14 19:37:03
...

Future模式

Future模式是一种常见的设计模式,它的核心思想是异步调用,当调用一个函数方法时,如果方法执行非常慢,但我们又不急着需要结果,因此我们可以让被调用者立即返回,让他在后台慢慢处理,对于调用者来说可以先处理一些其他的任务,当真正需要数据时尝试获得需要的数据。
Future模式无法立即给出需要的数据,但是他会返回一个契约,将来凭借着这个锲约去重新获取结果。

JDK Futrue接口

Future接口代表异步计算的结果,对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。必要时可以通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

Future接口位于java.util.concurrent包下:

public interface Future<V> {
    boolean cancel(boolean mayInterruptIfRunning);
    boolean isCancelled();
    boolean isDone();
    V get() throws InterruptedException, ExecutionException;
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}
  • V get() :获取异步执行的结果,如果没有结果可用,此方法会阻塞直到异步计算完成。
  • V get(Long timeout , TimeUnit unit) :获取异步执行结果,如果没有结果可用,此方法会阻塞,但是会有时间限制,如果阻塞时间超过设定的timeout时间,该方法将抛出异常。
  • boolean isDone() :如果任务执行结束,无论是正常结束或是中途取消还是发生异常,都返回true。
  • boolean isCanceller() :如果任务完成前被取消,则返回true。
  • boolean cancel(boolean mayInterruptRunning) :cancel()方法用来取消异步任务的执行。如果异步任务已经完成或者已经被取消,或者由于某些原因不能取消,则会返回false。如果任务还没有被执行,则会返回true并且异步任务不会被执行。如果任务已经开始执行了但是还没有执行完成,若mayInterruptIfRunning为true,则会立即中断执行任务的线程并返回true,若mayInterruptIfRunning为false,则会返回true且不会中断任务执行线程。

一个小插曲

一般线程池提交任务用到了submit方法,该方法在java.util.concurrent.ExecutorService接口中有以下三种定义:

	<T> Future<T> submit(Callable<T> task);  
	<T> Future<T> submit(Runnable task, T result);  
	Future<?> submit(Runnable task);  
  1. submit提交一个实现Callable接口的任务,并且返回封装了异步计算结果的Future。
  2. submit提交一个实现Runnable接口的任务,并且指定了在调用Future的get方法时返回的result对象。
  3. submit提交一个实现Runnable接口的任务,并且返回封装了异步计算结果的Future。

此方法的实现在java.util.concurrent.AbstractExecutorService抽象方法中:

 public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}
 public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);
    execute(ftask);
    return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

三个重载的方法内部实现几乎一样,都是通过newTaskFor创建一个RunnableFuture的实例,并且newTaskFor是两个重载方法。然后将ftask交给线程池执行,具体的执行过程在后面的文章中讲述。
进入newTaskFor方法的内部实现:

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

两个newTaskFor方法内部都创建了一个FutureTask对象返回。FutureTask是Future接口的实现。

FutrueTask

FutureTask除了实现了Future接口外还实现了Runnable接口,所以FutureTask既能当做一个Runnable直接被Thread执行,也能作为Future用来得到Callable的计算结果。
玩转FutrueTask源码

内部几个重要的变量:

//执行的任务
private Callable<V> callable;
//用来保存任务执行结果,如果发生异常,则用来保存异常原因
private Object outcome; 
//运行Callable的线程
private volatile Thread runner;
//Treiber堆栈来保存等待的线程,注意这里不是执行任务的线程,而是调用get或者cancle的线程。
private volatile WaitNode waiters;

构造过程

FurtureTask有两个构造方法,一种是参数为Runnable和Result,另一种是Callable。若为第一种则将Runnable和Result包装为Callable类型,然后赋值给FutureTask的的成员变量callable,将状态置位NEW.

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

任务状态
state字段用来保存FutureTask内部的任务执行状态,一共有7中状态,每种状态及其对应的值如下:

private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

代表的含义为:

  • NEW:表示是个新的任务或者还没被执行完的任务。这是初始状态。
  • COMPLETING:任务已经执行完成或者执行任务的时候发生异常,但是任务执行结果或者异常原因还没有保存到outcome字段的时候,状态会从NEW变更到COMPLETING。但是这个状态会时间会比较短,属于中间状态。
  • NORMAL:任务已经执行完成并且任务执行结果已经保存到outcome字段,状态会从COMPLETING转换到NORMAL。这是一个最终态。
  • EXCEPTIONAL:任务执行发生异常并且异常原因已经保存到outcome字段中后,状态会从COMPLETING转换到EXCEPTIONAL。这是一个最终态。
  • CANCELLED:任务还没开始执行或者已经开始执行但是还没有执行完成的时候,用户调用了cancel(false)方法取消任务且不中断任务执行线程,这个时候状态会从NEW转化为CANCELLED状态。这是一个最终态。
  • INTERRUPTING: 任务还没开始执行或者已经执行但是还没有执行完成的时候,用户调用了cancel(true)方法取消任务并且要中断任务执行线程但是还没有中断任务执行线程之前,状态会从NEW转化为INTERRUPTING。这是一个中间状态。
  • INTERRUPTED:调用interrupt()中断任务执行线程之后状态会从INTERRUPTING转换到INTERRUPTED。这是一个最终态。

所有值大于COMPLETING的状态都表示任务已经执行完成(任务正常执行完成,任务执行异常或者任务被取消,状态转换关系如下:

NEW -> COMPLETING -> NORMAL
NEW -> COMPLETING -> EXCEPTIONAL
NEW -> CANCELLED
NEW -> INTERRUPTING -> INTERRUPTED

run

在创建了一个FutureTask对象之后,接下来就是在另一个线程中执行这个Task,无论是通过直接new一个Thread还是通过线程池,执行的都是run()方法,接下来就看看run()方法的实现。

 public void run() {
    // 1. 状态如果不是NEW,说明任务或者已经执行过,或者已经被取消,直接返回
    // 2. 状态如果是NEW,则尝试把当前执行线程保存在runner字段中.如果赋值失败则直接返回
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
				//3. 执行任务
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
				//4. 任务异常
                result = null;
                ran = false;
                setException(ex);
            }
            if (ran)
				//4. 任务异常
                set(result);
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
		//如果任务被中断,执行中断处理
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

详细过程如下:

  1. 判断当前任务的state是否等于NEW,如果不为NEW则说明任务或者已经执行过,或者已经被取消,直接返回。
  2. 如果状态为NEW则接着会通过unsafe类把任务执行线程引用CAS的保存在runner字段中,如果保存失败,则直接返回。
  3. 执行任务
  4. 如果任务执行发生异常,则调用setException()方法保存异常信息。setException()方法如下:
protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = t;
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
        finishCompletion();
    }
}
  • 首先会CAS的把当前的状态从NEW变更为COMPLETING状态。
  • 把异常原因保存在outcome字段中,outcome字段用来保存任务执行结果或者异常原因。
  • CAS的把当前任务状态从COMPLETING变更为EXCEPTIONAL。
  • 调用finishCompletion(),执行一些清理工作
  1. 如果正常执行完毕,调用set(result)方法:
protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
        outcome = v;
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
        finishCompletion();
    }
}

该方法和上面的setExcetion方法类似

  • 首先会CAS的把当前的状态从NEW变更为COMPLETING状态。
  • 把任务执行结果保存在outcome字段中。
  • CAS的把当前任务状态从COMPLETING变更为NORMAL。
  • 调用finishCompletion()。
  1. 如果任务被中断,执行中断处理

发起任务线程跟执行任务线程通常情况下都不会是同一个线程,在任务执行线程执行任务的时候,任务发起线程可以查看任务执行状态、获取任务执行结果、取消任务等等操作,接下来分析下这些操作。

get

任务发起线程可以调用get()方法来获取任务执行结果,如果此时任务已经执行完毕则会直接返回任务结果,如果任务还没执行完毕,则调用方会阻塞直到任务执行结束返回结果为止。get()方法实现如下:

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

get方法的实现比较直观,判断任务当前的state <= COMPLETING是否成立。COMPLETING状态是任务是否执行完成的临界状态。如果成立,表明任务还没有结束(这里的结束包括任务正常执行完毕,任务执行异常,任务被取消),则会调用awaitDone()进行阻塞等待,看看awaitDone的实现:

 private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }

        int s = state;
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        } else if (s == COMPLETING) // cannot time out yet
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q);
        else if (timed) {
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos);
        }
        else
            LockSupport.park(this);
    }
}

流程如下:

  1. 判断调用get()的线程是否被其他线程中断,如果是的话则在等待队列中删除对应节点然后抛出InterruptedException异常。
  2. 获取任务当前状态,如果当前任务状态大于COMPLETING则表示任务执行完成,则把thread字段置null并返回结果。
  3. 如果任务处于COMPLETING状态,则表示任务已经处理完成(正常执行完成或者执行出现异常),但是执行结果或者异常原因还没有保存到outcome字段中。这个时候调用线程让出执行权让其他线程优先执行。
  4. 如果等待节点q为空,则构造一个等待节点WaitNode。
  5. 如果第四步中新建的节点还没入队列,则CAS的把该节点加入waiters队列的首节点。
  6. 阻塞等待,等待被唤醒

总结一下
1. 调用get方法的线程先尝试判断任务的状态,
2. 如果没能执行完,则构建等待节点,加入栈顶(多线程共享)
3. 阻塞线程,等待唤醒

很容易能想到,唤醒线程的操作肯定在任务执行完之后,其实就是我们前面遗留的一个方法finishCompletion(),不管任务执行异常还是任务正常执行完毕,或者取消任务,最后都会调用该方法:

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
 
    done();
 
    callable = null;        // to reduce footprint
}

这个方法的实现比较简单,依次遍历waiters链表,唤醒节点中的线程,然后把callable置空。
被唤醒的线程会各自从awaitDone()方法中的LockSupport.park*()阻塞中返回,然后会进行新一轮的循环。在新一轮的循环中会返回执行结果(或者更确切的说是返回任务的状态)。

cancel(boolean)

用户可以调用cancel(boolean)方法取消任务的执行,cancel()实现如下:

public boolean cancel(boolean mayInterruptIfRunning) {
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
            }
        }
    } finally {
        finishCompletion();
    }
    return true;
}
  1. 判断任务当前执行状态,如果任务状态不为NEW,则说明任务或者已经执行完成,或者通过CAS修改状态为INTERRUPTING、CANCELLED失败了,则返回false
  2. 判断需要中断任务执行线程,则中断任务执行线程。 修改任务状态为INTERRUPTED。
  3. 最后调用finishCompletion

当调用cancel(true)方法的时候,实际执行还是Thread.interrupt()方法,而interrupt()方法只是设置中断标志位,如果被中断的线程处于sleep()、wait()或者join()逻辑中则会抛出InterruptedException异常。因此结论是:cancel(true)并不一定能够停止正在执行的异步任务。

相关标签: 异步