欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Callable、Future详解 | Executor框架

程序员文章站 2022-04-03 12:39:11
...
 

一:关于 Callable<V>的源码

package java.util.concurrent;

public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}

Callable<V>: 返回结果并且可能抛出异常的任务。实现者定义了一个不带任何参数的叫做 call的方法。 Callable<V>接口类似于 Runnable,两者都是为那些其实例可能被另一个线程执行的类设计的。但是 Runnable不会返回结果,并且无法抛出经过检查的异常。 Executors类包含一些从其他普通形式转换成 Callable<V>类的实用方法。

 

 

在并发编程时,一般使用Runnable接口,然后扔给线程池完事,这种情况下不需要线程的结果。 所以run()的返回值是void类型。如下代码所示:

 
package future;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FutureTest {
	public static class Task implements Runnable{
		@Override
		public void run() {
			System.out.println("做任务----"+Thread.currentThread().getName());
		}
	}
	public static void main(String[] args) {
		//创建线程池
		ExecutorService es = Executors.newCachedThreadPool();
		for(int i=0;i<100;i++){
			es.submit(new Task());
		}
	}
}
 

 

如果是一个多线程协作程序,比如菲波拉切数列,112358...使用多线程来计算。
但后者需要前者的结果,就需要用Callable<V>接口了。如下代码所示:

 

package future;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class FutureTest2 {
	public static class Task implements Callable<String>{
		@Override
		public String call() throws Exception {
			System.out.println("做任务----"+Thread.currentThread().getName());
			return "返回运算结果";
		}
	}
	public static void main(String[] args) {
		//创建线程池
		ExecutorService es = Executors.newCachedThreadPool();
		for(int i=0;i<100;i++){
			es.submit(new Task());
		}
	}
}

 

 

我们看到,当Task类实现Callable接口后,重写了call()方法,call()方法是有返回值的。但我们如何来接受call()方法的返回值呢?

 

package future;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class FutureTest3 {
	public static class Task implements Callable<String>{
		@Override
		public String call() throws Exception {
			System.out.println("做任务----"+Thread.currentThread().getName());
			return "返回运算结果";
		}
	}
	public static void main(String[] args) throws InterruptedException, ExecutionException {
		
		List<Future<String>> result = new ArrayList<Future<String>>();
		
		//创建线程池
		ExecutorService es = Executors.newCachedThreadPool();
		for(int i=0;i<100;i++){
			result.add(es.submit(new Task()));
		}
		for(Future<String> f : result){
			System.out.println(f.get());
		}
	}
}

 

从这里可以看出,这时候,Future<V>就出场了,使用Future<V>接口的get()方法可以获得call()方法返回的结果,当调用Futureget()方法时,当前线程就开始阻塞,直到call()方法结束返回结果。

 

Future模式可以这样来描述:我有一个任务,提交给了FutureFuture我完成这个任务。期间我自己可以去做任何想做的事情。一段时间之后,我就便可以从Future那儿取出结果。就相当于下了一张订货单,一段时间后可以拿着提订单来提货,这期间可以干别的任何事情。其中Future接口就是订货单,真正处理订单的是Executor类,它根据Future接口的要求来生产产品。

 

 

二:关于 Future<V>的源码

 

 

package java.util.concurrent;

/**
* Future 表示异步计算的结果。它提供了检查计算是否完成的方法,以等待计算的完成,并获取计算的结果。计算完成后只能使
* 用 get 方法来获取结果,如有必要,计算完成前可以阻塞此方法。取消则由 cancel 方法来执行。还提供了其他方法,以确     * 定任务是正常完成还是被取消了。一旦计算完成,就不能再取消计算。如果为了可取消性而使用 Future 但又不提供可用的结     * 果,则可以声明 Future<?> 形式类型、并返回 null 作为底层任务的结果。
*
  * @see FutureTask
  * @see Executor
  * @since 1.5
  * @author Doug Lea
  * @param<V> The result type returned by this Future's <tt>get</tt> method
  */
publicinterface Future<V> {

    /**
     * 试图取消对此任务的执行。如果任务已完成、或已取消,或者由于某些其他原因而无法取消,则此尝试将失败。当调用 cancel * 时,如果调用成功,而此任务尚未启动,则此任务将永不运行。如果任务已经启动,则 mayInterruptIfRunning 参数     
     * 确定是否应该以试图停止任务的方式来中断执行此任务的线程。
     */
    boolean cancel(boolean mayInterruptIfRunning);
    /**
     * 如果在任务正常完成前将其取消,则返回 true。   
     */
    boolean isCancelled();

    /**
     * 如果任务已完成,则返回 true。可能由于正常终止、异常或取消而完成,在所有这些情况中,此方法都将返回 true。    
     */
    boolean isDone();

    /**
     * 如有必要,等待计算完成,然后获取其结果。 
     */
    V get() throws InterruptedException, ExecutionException;

    /**
     * 如有必要,最多等待为使计算完成所给定的时间之后,获取其结果(如果结果可用)。
     */
    V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException;
}

 

1)   Future接口是一个泛型接口,严格的格式应该是Future<V>,其中V代表了Future执行的任务返回值的类型

2)   Future接口提供方法来检测任务是否被执行完,等待任务执行完获得结果,也可以设置任务执行的超时时间。这个设置超时的方法就是实现Java程序执行超时的关键。

 

 

 

 

三:关于 Future实现

 

 
   


Callable<V>、Future<V>详解 | Executor框架
            
    
    博客分类: 线程 CallableFutureFutureTaskExecutor并发
 

 

Future的实现类有FutureTaskSwingWork,其中SwingWork用于GUI编程模块,在并发包里经常用到的是FutureTask,  这里主要介绍一下FutureTask

 

 

类声明部分

 

public class FutureTask<V> implements RunnableFuture<V> {}

 

 

FutureTask: 实现了RunnableFuture<V>接口,而RunnableFuture<V>又是extends(继承) RunnableFuture<V>两个接口,所以它既可以作为Runnable被线程执行,又可以作为Future<V>得到 Callable<V>的返回值,那么这个组合的使用有什么好处呢?假设有一个很耗时的返回值需要计算,并且这个返回值不是立刻需要的话,那么就可以使用这个组 合,用另一个线程去计算返回值,而当前线程在使用这个返回值之前可以做其它的操作,等到需要这个返回值时,再通过Future<V>得到,岂不美哉!

 

变量部分


private volatile int state;
private static final int NEW          = 0;               //新建  
private static final int COMPLETING   = 1;              //执行中 
private static final int NORMAL       = 2;               //正常
private static final int EXCEPTIONAL  = 3;              //异常       
private static final int CANCELLED    = 4;              //取消
private static final int INTERRUPTING = 5;             //中断中
private static final int INTERRUPTED  = 6;             //被中断
 

state:任务的状态,最初是 NEW,完成期间,状态也许暂时呈现为COMPLETING(当结果已经被设值)或者INTERRUPTING(only while interrupting the runner to satisfy a cancel(true)),

 

可能出现的状态转换情况:

* NEW -> COMPLETING -> NORMAL                    正常完成的流程

* NEW -> COMPLETING -> EXCEPTIONAL              出现异常的流程

* NEW -> CANCELLED                                 被取消的流程

* NEW -> INTERRUPTING -> INTERRUPTED            被中断的流程

构造函数:

   /**
     *  创建一个 FutureTask,一旦运行就执行给定的 Callable。    
     */
    public FutureTask(Callable<V> callable) {
        if (callable == null)
            thrownew NullPointerException();
        this.callable = callable;
        this.state = NEW;       // ensure visibility of callable
    }

    /**
     *  创建一个 FutureTask,一旦运行就执行给定的 Runnable,并安排成功完成时 get 返回给定的结果
     */
    public FutureTask(Runnable runnable, V result) {
        this.callable = Executors.callable(runnable, result);
        this.state = NEW;       // ensure visibility of callable
    }
 

从构造函数中可以看出,当构造一个FutureTask时,state会被置为NEWNEW也就是所有状态变化路径的起始状态

 

FutureTask生命周期的变化,主要取决于 run()方法先被调用还是cancel ()方法会被调用,这两个方法的执行顺序决定了FutureTask的生命周期的四种走向。

 

先来看run()方法

 

publicvoid run() {
     //首先判断任务的状态,如果任务的状态值不为NEW,或 runner变量的值不为null,则返回(说明正在走或已经走了4种状态变化的一种)
        if (state != NEW ||
            !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                         null, Thread.currentThread()))
            return;
//如果状态值是NEW,则开始执行任务
        try {
            Callable<V> c = callable;
            if (c != null && state == NEW) { //如果任务不为空,且状态为NEW,开始执行
                V result;                         //任务返回的结果
                boolean ran;
                try {
                    result = c.call();          //执行任务并返回结果
                    ran = true;                  //标记任务执行成功
                } catch (Throwable ex) {  //任务执行中发生异常
                    result = null;
                    ran = false;
                    setException(ex);           //设置异常
                }
                if (ran)                          //任务执行成功,设置结果
                    set(result);
            }
        } finally {
            runner = null;                       //runner置为null
            // state must be re-read after nulling runner to prevent
            // leaked interrupts
            int s = state;
            if (s >= INTERRUPTING)
                handlePossibleCancellationInterrupt(s);
        }
    }
 

 

1)任务执行成功,会调用set()方法设置结果

protected void set(V v) {
//如过state是NEW,把state设置成COMPLETING 
     if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
          outcome = v;
        //将任务设置成NORMAL   over the task  
          UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
          finishCompletion();
     }
}
 

 

set()方法可以看出,把任务运行的结果赋值给了outcome变量,这个执行流程导致的状态变化就是  NEW->COMPLETING->NORMAL   

 

2)任务执行中发生异常,会调用setException()方法

protecte dvoid setException(Throwable t) {
//如过state是NEW,把state设置成COMPLETING    
     if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
          outcome = t;
        //将任务设置成EXCEPTIONAL  
          UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
          finishCompletion();
     }
}
 

 

这个执行流程导致的状态变化就是  NEW->COMPLETING->EXCEPTIONAL

 

再来看cancel()方法:

 

//参数:mayInterruptIfRunning   是否中断running
publicboolean cancel(boolean mayInterruptIfRunning) { 
     if (state != NEW)      //状态不为NEW,返回
          returnfalse;
     if (mayInterruptIfRunning) { //如果应该中断执行此任务的线程
          //如过state是NEW,把state设置成INTERRUPTING
          if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
               returnfalse;
          Thread t = runner;
          if (t != null)
                t.interrupt();
         //将任务设置成INTERRUPTED
          UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
     }
     //如过state是NEW,把state设置成CANCELLED
     elseif (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
            returnfalse;
     finishCompletion();
     returntrue;
}

 

如果mayInterruptIfRunning==true,则流程为:NEW->INTERRUPTING ->INTERRUPTED, 否则流程为:NEW->CANCELLED

 

到此,四个流程走完了!

 

 

参考资料:

   JDK API文档

   http://blog.csdn.net/yangyan19870319/article/details/6093481

http://www.oschina.net/question/54100_83333

http://www.2cto.com/kf/201411/351903.html

http://blog.csdn.net/ghsau/article/details/7451464

http://blog.csdn.net/liulipuo/article/details/39029643

 

  • Callable<V>、Future<V>详解 | Executor框架
            
    
    博客分类: 线程 CallableFutureFutureTaskExecutor并发
  • 大小: 6.1 KB