欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java 源码分析-FutureTask

程序员文章站 2022-07-14 16:16:43
...

  之前在分析Android里面的AsyncTask的时候,里面提及到FutureTask这个类。可能那时候思维比较局限,然后对这个类的认知还不够,所以在解释方面可能不是很好。今天由于在看ScheduledThreadPoolExecutor类的源码,里面涉及到FutureTask类,我觉得必须先理解到FutureTask的含义,对ScheduledThreadPoolExecutor的理解可能才会更加的清楚一些。
  本文参考资料:

  1.FutureTask 深度解析
  2.方腾飞、魏鹏、程晓明的《Java 并发编程的艺术》

1.FutureTask的简介

  在正式分析这个类之前,我们先来对FutureTask类有一个整体的认识,因为可能有些老哥对这个不是很熟悉。
  首先,FutureTask实现了Future接口和Runnable,所以FutureTask类可以被Thread看成一个任务来执行。实现Future接口主要是作为一个标记,当时当前这个任务在执行时可以被cancel,还可以获取异步执行的结果,这个是Runnable接口不能做到的。
  同时,由于FutureTask实现了Runnable接口,所以可以被Executor执行,这个也是为什么在ScheduledThreadPoolExecutor里面使用FutureTask类。
  说到这里,可能有些老哥对这个类还是有点疑惑,不急,现在我们从原理上理解这个类到底是干嘛的。

2. FutureTask的成员变量

  在分析FutureTask类之前,我们还是先对这个FutureTask类的成员变量有一个基本的了解,这样在理解后面的源代码的时候对我们有一定的帮助。

变量名 类型 描述
state int 当前任务执行的状态,有多种状态可以选择,待会会详细的介绍每种状态的含义。
callable Callable 执行的任务类,调用Callable的call方法可以执行Callable内部的任务。真正调用Callable的call方法是在FutureTask的run方法里面。
outcome Object 任务执行过程中产生的结果,这个对象里面存储的有可能是正常执行之后产生的结果,也有可能是执行过程产生的异常。
runner Thread 当前正在执行任务的线程,由于一个FutureTask任务可能会被多个线程执行,所以这里runner记录正在执行该任务的线程。
waiters WaitNode 等待栈中的一个元素。如果一个线程在尝试获取任务的结果(调用get方法),首先会创建一个WaitNode的对象,进入栈;如果顺利获得结果,会将这个线程从栈中移除掉;如果不能获得获得结果(任务还未执行完毕),则会被阻塞,只有当任务完成之后,在finishCompletion方法里面被唤醒。

  上面在解释state中说到,state可能有多个状态,我们来看看每种状态的含义。

变量名 描述
NEW 0 表示处于新建状态,还没有开始执行。
COMPLETING 1 state中间状态一种,表示任务将要完成,但是还完成最终的一些处理。可能对这里有些疑惑,后面我们根据源代码就能清楚的知道。
NORMAL 2 state最终状态的一种,表示任务已经正常完成,如果任务的状态处于NORMAL,那么outcome 里面存储的就是任务执行的结果。
EXCEPTIONAL 3 state最终状态的一种,表示任务在执行过程发生了异常,此时outcome里面存储的就是产生的异常对象。
CANCELLED 4 state中间状态一种,表示这个这个任务正在被cancel。在调用cancel方法时,如果mayInterruptIfRunning设置为false的话,state的中间状态会被更新为CANCELLED。
INTERRUPTING 5 state中间状态一种,表示这个这个任务正在被cancel。在调用cancel方法时,如果mayInterruptIfRunning设置为true的话,state的中间状态会被更新为CANCELLED。
INTERRUPTED 6 state最终状态的一种,表示任务已经被cancel调用了。这个所说的cancel掉,实际上是线程的中断位被标记为中断。

  我们现在对FutureTask的成员变量有了初步的了解,在后面分析过程中相对来说应该比较容易了。

3.构造方法

  对成员变量有了一个基本的了解之后,我们来看看FutureTask的构造方法,看看它的构造方法里面给我们初始化了哪些东西。FutureTask有两个构造方法,我们只看其中的一个吧。

    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }

  构造过程是非常简单,只做了两步:1.将初始化callable变量,这里需要注意的是,将一个Runnable对象包装成了一个Callable对象;2.初始化state,将初始化为NEW。

4.run过程

  当我们一个创建好的FutureTask对象提交到线程池或者使用Thread来执行,最终都会调用run方法的,所以分析run方法是必要的。我们先来看看run方法的代码:

    public void run() {
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) {
                V result;
                boolean ran;
                try {
                    //执行任务
                    result = c.call();
                    ran = true;
                } catch (Throwable ex) {
                    result = null;
                    ran = false;
                    setException(ex);
                }
                if (ran)
                    //设置结果
                    set(result);
            }
        } finally {
            // runner must be non-null until state is settled to
            // prevent concurrent calls to run()
            runner = null;
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }

  整个run方法的过程还是比较简单的,我将它分为2步:1.调用callable的call方法,进行任务真正执行;2.执行完毕之后,设置结果。这个结果有可能是异常,也有可能是正常的结果。
  如果在任务的执行过程中发生了异常,那么会调用setException方法:

    protected void setException(Throwable t) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = t;
            UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
            finishCompletion();
        }
    }

  从setException方法里面,我们可以得出,首先尝试着将state字段更新为COMPLETING,然后将产生的异常对象赋值给outcome变量里面去,最后在将state更新为EXCEPTIONAL。
  如果整个执行过程正常结束的话,那么会调用set方法来设置结果。

    protected void set(V v) {
        if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
            outcome = v;
            UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
            finishCompletion();
        }
    }

  set方法的执行过程跟setException方法差不多,只是在outcome里面存储的是从call方法获得的结果,还有就是state最终的值是NORMAL。

5.get过程

  FutureTask有一个特点就是,我们可以从异步的过程中获得执行的结果,这个获得的方法就是调用get方法。但是调用这个方法需要的注意的是,如果当前任务还没有执行完毕,那么会阻塞在get方法里面,直到获得成功为止。我们来看看get方法:

    public V get() throws InterruptedException, ExecutionException {
        int s = state;
        if (s <= COMPLETING)
            s = awaitDone(false, 0L);
        return report(s);
    }

  我们发现,如果当前任务没有完成,也就是说,如果state没有被更新为COMPLETING以上的话,那么会调用awaitDone方法来等待任务完成;如果任务完成的话,那么直接调用report方法来返回结果。这里先看看report方法:

    private V report(int s) throws ExecutionException {
        Object x = outcome;
        if (s == NORMAL)
            return (V)x;
        if (s >= CANCELLED)
            throw new CancellationException();
        throw new ExecutionException((Throwable)x);
    }

  report方法主要返回任务的结果,当然如果之前任务执行过程中发生了异常,这里就会抛出异常。
  这里,我们再来看看awaitDone方法:

    private int awaitDone(boolean timed, long nanos)
        throws InterruptedException {
        final long deadline = timed ? System.nanoTime() + nanos : 0L;
        WaitNode q = null;
        boolean queued = false;
        for (;;) {
            if (Thread.interrupted()) {
                removeWaiter(q);
                throw new InterruptedException();
            }

            int s = state;
            if (s > COMPLETING) {
                if (q != null)
                    q.thread = null;
                return s;
            }
            else if (s == COMPLETING) // cannot time out yet
                Thread.yield();
            else if (q == null)
                q = new WaitNode();
            else if (!queued)
                //进入栈
                queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                     q.next = waiters, q);
            else if (timed) {
                nanos = deadline - System.nanoTime();
                if (nanos <= 0L) {
                    removeWaiter(q);
                    return state;
                }
                //阻塞
                LockSupport.parkNanos(this, nanos);
            }
            else
                //阻塞
                LockSupport.park(this);
        }
    }

  在这个方法,执行流程分为两步:1.进入这个方法之后,首先创建WaitNode让他进行等待栈中;2.进行自我阻塞。
  如果这个线程被唤醒或者第一次执行到这个方法之后,发现自己被中断了,那么首先会将自己的WaitNode从等待栈移除,其次抛出InterruptedException异常。只有当状态大于COMPLETING,才会退出这个方法。

6.cancel过程

  在使用FutureTask的时候,cancel操作也是常见的,这里,我们对cancel进行分析一下。

    public boolean cancel(boolean mayInterruptIfRunning) {
        if (!(state == NEW &&
              UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
                  mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
            return false;
        try {    // in case call to interrupt throws exception
            if (mayInterruptIfRunning) {
                try {
                    Thread t = runner;
                    if (t != null)
                        t.interrupt();
                } finally { // final state
                    UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
                }
            }
        } finally {
            finishCompletion();
        }
        return true;
    }

   cancel方法的意思是非常简单的,主要的作用是将一个线程的中断位标记为中断,为其他过程做一个参考。