解析Java异步之call future
一、概述
我们大家都知道,在 java 中创建线程主要有三种方式:
- 继承 thread 类;
- 实现 runnable 接口;
- 实现 callable 接口。
而后两者的区别在于 callable 接口中的 call()
方法可以异步地返回一个计算结果 future,并且一般需要配合executorservice 来执行。这一套操作在代码实现上似乎也并不难,可是对于call()方法具体怎么(被executorservice)执行的,以及 future 这个结果是怎么获取的,却又不是很清楚了。
那么本篇文章,我们就一起来学习下 callable 接口以及 future 的使用,主要面向两个问题:
- 承载着具体任务的 call() 方法如何被执行的?
- 任务的执行结果如何得到?
你可能会说,这两个难道不是一个问题吗?任务执行了就会有返回结果,而返回结果也一定是任务执行了才返回的,难道还能返回一个其他任务的结果么??不要着急,耐心的看下去,你就会发现,这两个还真的就是一个问题。
本文将分为两个部分,第一部分分别介绍 任务
、执行
、以及结果
这三个概念在 java api 中的实体和各自的继承关系,第二部分通过一个简单的例子回顾他们的用法,再理解下这两个问题的答案。
二、callable、executor 与 future
既然是一个任务被执行并返回结果,那么我们先来看看具体的任务,也就是 callable 接口。
2.1、任务:callable
非常简单,只包含一个有泛型返回值的 call() 方法,需要在最后返回定义类型的结果。如果任务没有需要返回的结果,那么将泛型 v 设为 void 并return null;
就可以了。对比的是 runnable,另一个明显的区别则是 callable可以抛出异常。
public interface callable<v> { v call() throws exception; } public interface runnable { public abstract void run(); }
2.2、执行:executorservice
说到线程就少不了线程池,而说到线程池肯定离不开 executor
接口。下面这幅图是 executor 的框架,我们常用的是其中的两个具体实现类 threadpoolexecutor 以及 scheduledthreadpoolexecutor,在 executors 类中通过静态方法获取。executors 中包含了线程池以及线程工厂的构造,与 executor 接口的关系类似于 collection 接口和 collections 类的关系。
那么我们自顶向下,从源码上了解一下 executor 框架,学习学习任务是如何被执行的。首先是 executor 接口,其中只定义了 execute() 方法。
public interface executor { void execute(runnable command); }
executorservice 接口继承了 executor 接口,主要扩展了一系列的 submit() 方法以及对 executor 的终止和判断状态。以第一个<t> future<t> submit(callable<t> task);
为例,其中 task 为用户定义的执行的异步任务,future 表示了任务的执行结果,泛型 t 代表任务结果的类型。
public interface executorservice extends executor { void shutdown(); // 现有任务完成后停止线程池 list<runnable> shutdownnow(); // 立即停止线程池 boolean isshutdown(); // 判断是否已停止 boolean isterminated(); <t> future<t> submit(callable<t> task); // 提交callale任务 <t> future<t> submit(runnable task, t result); future<?> submit(runnable task); // 针对callable集合的invokeall()等方法 }
抽象类abstractexecutorservice
是 threadpoolexecutor
的基类,在下面的代码中,它实现了executorservice 接口中的 submit() 方法。注释中是对应的 newtaskfor() 方法的代码,非常简单,就是将传入的callable 或 runnable 参数封装成一个 futuretask 对象。
// 1.第一个重载方法,参数为callable public <t> future<t> submit(callable<t> task) { if (task == null) throw new nullpointerexception(); runnablefuture<t> ftask = newtaskfor(task); // return new futuretask<t>(callable); execute(ftask); return ftask; } // 2.第二个重载方法,参数为runnable public future<?> submit(runnable task) { if (task == null) throw new nullpointerexception(); runnablefuture<void> ftask = newtaskfor(task, null); // return new futuretask<t>(task, null); execute(ftask); return ftask; } // 3.第三个重载方法,参数为runnable + 返回对象 public <t> future<t> submit(runnable task, t result) { if (task == null) throw new nullpointerexception(); runnablefuture<t> ftask = newtaskfor(task, result); // return new futuretask<t>(task, result); execute(ftask); return ftask; }
那么也就是说,无论传入的是 callable 还是 runnable,submit() 方法其实就做了三件事
具体来说,submit() 中首先生成了一个 runnablefuture 引用的 futuretask 实例,然后调用 execute() 方法来执行它,那么我们可以推测 futuretask 继承自 runnablefuture,而 runnablefuture 又实现了 runnable,因为execute() 的参数应为 runnable 类型。上面还涉及到了 futuretask 的构造函数,也来看一下。
public futuretask(callable<v> callable) { this.callable = callable; this.state = new; } public futuretask(runnable runnable, v result) { this.callable = executors.callable(runnable, result); // 通过适配器将runnable在call()中执行并返回result this.state = new; }
futuretask 共有两个构造方法。第一个构造方法比较简单,对应上面的第一个 submit(),采用组合的方式封装callable 并将状态设为new
;而第二个构造方法对应上面的后两个 submit() 重载,不同之处是首先使用了executors.callable
来将 runnable 和 result 组合成 callable,这里采用了适配器runnableadapter implements callable
,巧妙地在 call() 中执行 runnable 并返回结果。
static final class runnableadapter<t> implements callable<t> { final runnable task; final t result; // 返回的结果;显然:需要在run()中赋值 runnableadapter(runnable task, t result) { this.task = task; this.result = result; } public t call() { task.run(); return result; } }
在适配器设计模式中,通常包含**目标接口 target、适配器 adapter 和被适配者 adaptee **三类角色,其中目标接口代表客户端(当前业务系统)所需要的功能,通常为借口或抽象类;被适配者为现存的不能满足使用需求的类;适配器是一个转换器,也称 wrapper,用于给被适配者添加目标功能,使得客户端可以按照目标接口的格式正确访问。对于 runnableadapter 来说,callable 是其目标接口,而 runnable 则是被适配者。runnableadapter 通过覆盖 call() 方法使其可按照 callable 的要求来使用,同时其构造方法中接收被适配者和目标对象,满足了 call() 方法有返回值的要求。
那么总结一下 submit() 方法执行的流程,就是:callable 被封装在 runnable 的子类中传入 execute() 得以执行。
2.3、结果:future
要说 future 就是异步任务的执行结果其实并不准确,因为它代表了一个任务的执行过程,有状态、可以被取消,而 get() 方法的返回值才是任务的结果。
public interface future<v> { boolean cancel(boolean mayinterruptifrunning); boolean iscancelled(); boolean isdone(); v get() throws interruptedexception, executionexception; v get(long timeout, timeunit unit) throws interruptedexception, executionexception, timeoutexception; }
我们在上面中还提到了 ruunablefuture 和 futuretask。从官方的注释来看,ruunablefuture 就是一个可以 run的 future,实现了 runnable 和 future 两个接口,在 run() 方法中执行完计算时应该将结果保存起来以便通过 get()获取。
public interface runnablefuture<v> extends runnable, future<v> { /** * sets this future to the result of its computation unless it has been cancelled. */ void run(); }
futuretask 直接实现了 runnablefuture 接口,作为执行过程,共有下面这几种状态,其中 completing 为一个暂时状态,表示正在设置结果或异常,对应的,设置完成后状态变为 normal 或 exceptional;cancelled、interrupted 表示任务被取消或中断。在上面的构造方法中,将 state 初始化为 new。
private volatile int state; private static final int new = 0; private static final int completing = 1; private static final int normal = 2; private static final int exceptional = 3; private static final int cancelled = 4; private static final int interrupting = 5; private static final int interrupted = 6;
然后是 futuretask 的主要内容,主要是 run() 和 get()。注意 outcome 的注释,无论是否发生异常返回的都是这个 outcome,因为在执行中如果执行成功就将结果设置给了它(set()
),而发生异常时将异常赋给了他(setexception()
),而在获取结果时也都返回了 outcome(通过report()
)。
public class futuretask<v> implements runnablefuture<v> { private callable<v> callable; // target,待执行的任务 /** 保存执行结果或异常,在get()方法中返回/抛出 */ private object outcome; // 非volatile,通过cas保证线程安全 public void run() { ...... callable<v> c = callable; if (c != null && state == new) { v result; boolean ran; try { result = c.call(); // 调用call()执行用户任务并获取结果 ran = true; // 执行完成,ran置为true } catch (throwable ex) { // 调用call()出现异常,而run()方法继续执行 result = null; ran = false; setexception(ex); // setexception(throwable t): compareandswapint(new, completing); outcome = t; } if (ran) set(result); // set(v v): compareandswapint(new, completing); outcome = v; } } public v get() throws interruptedexception, executionexception { int s = state; if (s <= completing) s = awaitdone(false, 0l); // 加入队列等待completing完成,可响应超时、中断 return report(s); } public v get(long timeout, timeunit unit) throws interruptedexception, executionexception, timeoutexception { // 超时等待 } private v report(int s) throws executionexception { object x = outcome; if (s == normal) // 将outcome作为执行结果返回 return (v)x; if (s >= cancelled) throw new cancellationexception(); throw new executionexception((throwable)x); // 将outcome作为捕获的返回 } }
futuretask 实现了 runnablefuture 接口,所以有两方面的作用。
- 第一,作为 runnable 传入 execute() 方法来执行,同时封装 callable 对象并在 run() 中调用其 call() 方法;
- 第二,作为 future 管理任务的执行状态,将 call() 的返回值保存在 outcome 中以通过 get() 获取。这似乎就能回答开头的两个问题,并且浑然天成,就好像是一个问题,除非发生异常的时候返回的不是任务的结果而是异常对象。
总结一下继承关系:
三、使用举例
文章的标题有点唬人,说到底还是讲 callable 的用法。现在我们知道了 future 代表了任务执行的过程和结果,作为 call() 方法的返回值来获取执行结果;而 futuretask 是一个 runnable 的 future,既是任务执行的过程和结果,又是 call 方法最终执行的载体。下面通过一个例子看看他们在使用上的区别。
首先创建一个任务,即定义一个任务类实现 callable 接口,在 call() 方法里添加我们的操作,这里用耗时三秒然后返回 100 模拟计算过程。
class mytask implements callable<integer> { @override public integer call() throws exception { system.out.println("子线程开始计算..."); for (int i=0;i<3;++i){ thread.sleep(1000); system.out.println("子线程计算中,用时 "+(i+1)+" 秒"); } system.out.println("子线程计算完成,返回:100"); return 100; } }
然后呢,创建一个线程池,并实例化一个 mytask 备用。
executorservice executor = executors.newcachedthreadpool(); mytask task = new mytask();
现在,分别使用 future 和 futuretask 来获取执行结果,看看他们有什么区别。
3.1、使用future
future 一般作为 submit() 的返回值使用,并在主线程中以阻塞的方式获取异步任务的执行结果。
system.out.println("主线程启动线程池"); future<integer> future = executor.submit(task); system.out.println("主线程得到返回结果:"+future.get()); executor.shutdown();
看看输出结果:
主线程启动线程池
子线程开始计算...
子线程计算中,用时 1 秒
子线程计算中,用时 2 秒
子线程计算中,用时 3 秒
子线程计算完成,返回:100
主线程得到返回结果:100
由于 get() 方法阻塞获取结果,所以输出顺序为子线程计算完成后主线程输出结果。
3.2、使用futuretask
由于 futuretask 集任务与结果于一身,所以我们可以使用 futuretask 自身而非返回值来管理任务,这需要首先利用 callable 对象来构造 futuretask,并调用不同的submit()
重载方法。
system.out.println("主线程启动线程池"); futuretask<integer> futuretask = new futuretask<>(task); executor.submit(futuretask); // 作为ruunable传入submit()中 system.out.println("主线程得到返回结果:"+futuretask.get()); // 作为future获取结果 executor.shutdown();
这段程序的输出与上面中完全相同,其实两者在实际执行中的区别也不大,虽然前者调用了submit(callable<t> task)
而后者调用了submit(runnable task)
,但最终都通过execute(futuretask)
来把任务加入线程池中。
四、总结
上面大费周章其实只是尽可能细致地讲清楚了 callable 中的任务是如何执行的,总结起来就是:
线程池中,submit() 方法实际上将 callable 封装在 futuretask 中,将其作为 runnable 的子类传给 execute()真正执行;futuretask 在 run() 中调用 callable 对象的 call() 方法并接收返回值或捕获异常保存在object outcome
中,同时管理执行过程中的状态state
;futuretask 同时作为 future 的子类,通过 get() 返回任务的执行结果,若未执行完成则通过等待队列进行阻塞等待完成;
futuretask 作为一个 runnable 的 future,其中最重要的两个方法如下。
以上就是解析java异步之call future的详细内容,更多关于java异步 call future的资料请关注其它相关文章!