Java8 CompletableFuture详解
java 8来了,是时候学一下新的东西了。java 7和java 6只不过是稍作修改的版本,而java 8将会发生重大的改进。或许是java 8太大了吧?今天我会给你彻底地解释jdk 8中的新的抽象 – completablefuture。众所周知,java 8不到一年就会发布,因此这篇文章是基于jdk 8 build 88 with lambda support的。completablefuture extends future提供了方法,一元操作符和促进异步性以及事件驱动编程模型,它并不止步于旧版本的java中。如果你打开javadoc of completablefuture你一定会感到震惊。大约有五十种方法(!),而且它们中的一些非常有意思而且不好理解,例如:
completablefuture<? extends u> other,
bifunction<? super t,? super u,? extends v> fn,
executor executor)
不必担心,继续读下去。completablefuture收集了所有listenablefuture in guava 和 settablefuture的特征。此外,内置的lambda表达式使它更接近于scala/akka futures。这听起来好得令人难以置信,但是请继续读下去。completablefuture有两个主要的方面优于ol中的future – 异步回调/转换,这能使得从任何时刻的任何线程都可以设置completablefuture的值。
一、提取、修改包装的值
通常futures代表其它线程中运行的代码,但事实并非总是如此。有时你想要创造一个future来表示你知道将会发生什么,例如jms message arrival。所以你有future但是未来并没有潜在的异步工作。你只是想在未来jms消息到达时简单地完成(解决),这是由一个事件驱动的。在这种情况下,你可以简单地创建completablefuture来返还给你的客户端,只要你认为你的结果是可用的,仅仅通过complete()就能解锁所有等待future的客户端。
首先你可以简单地创建新的completablefuture并且给你的客户端:
final completablefuture<string> future = new completablefuture<>();
//...
return future;
}
注意这个future和callable没有任何联系,没有线程池也不是异步工作。如果现在客户端代码调用ask().get()它将永远阻塞。如果寄存器完成回调,它们就永远不会生效了。所以关键是什么?现在你可以说:
…此时此刻所有客户端future.get()将得到字符串的结果,同时完成回调以后将会立即生效。当你想代表future的任务时是非常方便的,而且没有必要去计算一些执行线程的任务上。completablefuture.complete()只能调用一次,后续调用将被忽略。但也有一个后门叫做completablefuture.obtrudevalue(…)覆盖一个新future之前的价值,请小心使用。
有时你想要看到信号发生故障的情况,如你所知future对象可以处理它所包含的结果或异常。如果你想进一步传递一些异常,可以用completablefuture.completeexceptionally(ex) (或者用obtrudeexception(ex)这样更强大的方法覆盖前面的异常)。 completeexceptionally()也能解锁所有等待的客户端,但这一次从get()抛出异常。说到get(),也有completablefuture.join()方法在错误处理方面有着细微的变动。但总体上,它们都是一样的。最后也有completablefuture.getnow(valueifabsent)方法没有阻塞但是如果future还没完成将返回默认值,这使得当构建那种我们不想等太久的健壮系统时非常有用。
最后static的方法是用completedfuture(value)来返回已经完成future的对象,当测试或者写一些适配器层时可能非常有用。
二、创造和获取completablefuture
好了,那么手动地创建completablefuture是我们唯一的选择吗?不一定。就像一般的futures,我们可以关联存在的任务,同时completablefuture使用工厂方法:
static <u> completablefuture<u> supplyasync(supplier<u> supplier);
static <u> completablefuture<u> supplyasync(supplier<u> supplier, executor executor);
static completablefuture<void> runasync(runnable runnable);
static completablefuture<void> runasync(runnable runnable, executor executor);
无参方法executor是以…async结尾同时将会使用forkjoinpool.commonpool()(全局的,在jdk8中介绍的通用池),这适用于completablefuture类中的大多数的方法。runasync()易于理解,注意它需要runnable,因此它返回completablefuture<void>作为runnable不返回任何值。如果你需要处理异步操作并返回结果,使用supplier<u>:
final completablefuture<string> future = completablefuture.supplyasync(new supplier<string>() {
@override
public string get() {
//...long running...
return "42";
}
}, executor);
但是别忘了,java 8里面还有lambdas表达式呢!
finalcompletablefuture<string> future = completablefuture.supplyasync(() -> {
//...long running...
return "42";
}, executor);
或者:
final completablefuture<string> future =
completablefuture.supplyasync(() -> longrunningtask(params), executor);
虽然这篇文章不是关于lambda的,但是我会相当频繁地使用lambda表达式。
三、转换和作用于completablefuture(thenapply)
我说过completablefuture优于future但是你还不知道为什么吗?简单说,因为completablefuture是一个原子也是一个因子。我说的这句话没什么帮助吗?scala和javascript都允许future完成时允许注册异步回调,直到它准备好我们才要等待和阻止它。我们可以简单地说:运行这个函数时就出现了结果。此外,我们可以叠加这些功能,把多个future组合在一起等。例如如果我们从string转为integer,我们可以转为在不关联的前提下从completablefuture到 completablefuture<integer。这是通过thenapply()的方法:
<u> completablefuture<u> thenapply(function<? super t,? extends u> fn);
<u> completablefuture<u> thenapplyasync(function<? super t,? extends u> fn);
<u> completablefuture<u> thenapplyasync(function<? super t,? extends u> fn, executor executor);<p></p>
<p>如前所述...async版本提供对completablefuture的大多数操作,因此我将在后面的部分中跳过它们。记住,第一个方法将在future完成的相同线程中调用该方法,而剩下的两个将在不同的线程池中异步地调用它。
让我们来看看thenapply()的工作流程:</p>
<p><pre class="brush: java; gutter: true; first-line: 1; highlight: []; html-script: false">
completablefuture<string> f1 = //...
completablefuture<integer> f2 = f1.thenapply(integer::parseint);
completablefuture<double> f3 = f2.thenapply(r -> r * r * math.pi);
</p>
或在一个声明中:
completablefuture<double> f3 =
f1.thenapply(integer::parseint).thenapply(r -> r * r * math.pi);
这里,你会看到一个序列的转换,从string到integer再到double。但最重要的是,这些转换既不立即执行也不停止。这些转换既不立即执行也不停止。他们只是记得,当原始f1完成他们所执行的程序。如果某些转换非常耗时,你可以提供你自己的executor来异步地运行他们。注意,此操作相当于scala中的一元map。
四、运行完成的代码(thenaccept/thenrun)
completablefuture<void> thenaccept(consumer<? super t> block);
completablefuture<void> thenrun(runnable action);
在future的管道里有两种典型的“最终”阶段方法。他们在你使用future的值的时候做好准备,当 thenaccept()提供最终的值时,thenrun执行 runnable,这甚至没有方法去计算值。例如:
future.thenacceptasync(dbl -> log.debug("result: {}", dbl), executor);
log.debug("continuing");
…async变量也可用两种方法,隐式和显式执行器,我不会过多强调这个方法。
thenaccept()/thenrun()方法并没有发生阻塞(即使没有明确的executor)。它们像一个事件侦听器/处理程序,你连接到一个future时,这将执行一段时间。”continuing”消息将立即出现,尽管future甚至没有完成。
五、单个completablefuture的错误处理
到目前为止,我们只讨论计算的结果。那么异常呢?我们可以异步地处理它们吗?当然!
completablefuture<string> safe =
future.exceptionally(ex -> "we have a problem: " + ex.getmessage());
exceptionally()接受一个函数时,将调用原始future来抛出一个异常。我们会有机会将此异常转换为和future类型的兼容的一些值来进行恢复。safe进一步的转换将不再产生一个异常而是从提供功能的函数返回一个string值。
一个更加灵活的方法是handle()接受一个函数,它接收正确的结果或异常:
completablefuture<integer> safe = future.handle((ok, ex) -> {
if (ok != null) {
return integer.parseint(ok);
} else {
log.warn("problem", ex);
return -1;
}
});
handle()总是被调用,结果和异常都非空,这是个一站式全方位的策略。
六、一起结合两个completablefuture
异步处理过程之一的completablefuture非常不错但是当多个这样的futures以各种方式组合在一起时确实显示了它的强大。
七、结合(链接)这两个futures(thencompose())
有时你想运行一些future的值(当它准备好了),但这个函数也返回了future。completablefuture足够灵活地明白我们的函数结果现在应该作为*的future,对比completablefuture<completablefuture>。方法 thencompose()相当于scala的flatmap:
<u> completablefuture<u> thencompose(function<? super t,completablefuture<u>> fn);
…async变化也是可用的,在下面的事例中,仔细观察thenapply()(map)和thencompose()(flatmap)的类型和差异,当应用calculaterelevance()方法返回completablefuture:
completablefuture<document> docfuture = //...
completablefuture<completablefuture<double>> f =
docfuture.thenapply(this::calculaterelevance);
completablefuture<double> relevancefuture =
docfuture.thencompose(this::calculaterelevance);
//...
private completablefuture<double> calculaterelevance(document doc) //...
thencompose()是一个重要的方法允许构建健壮的和异步的管道,没有阻塞和等待的中间步骤。
八、两个futures的转换值(thencombine())
当thencompose()用于链接一个future时依赖另一个thencombine,当他们都完成之后就结合两个独立的futures:
<u,v> completablefuture<v> thencombine(completablefuture<? extends u> other, bifunction<? super t,? super u,? extends v> fn)
…async变量也是可用的,假设你有两个completablefuture,一个加载customer另一个加载最近的shop。他们彼此完全独立,但是当他们完成时,您想要使用它们的值来计算route。这是一个可剥夺的例子:
completablefuture<customer> customerfuture = loadcustomerdetails(123);
completablefuture<shop> shopfuture = closestshop();
completablefuture<route> routefuture =
customerfuture.thencombine(shopfuture, (cust, shop) -> findroute(cust, shop));
//...
private route findroute(customer customer, shop shop) //...
请注意,在java 8中可以用(cust, shop) -> findroute(cust, shop)简单地代替this::findroute方法的引用:
customerfuture.thencombine(shopfuture, this::findroute);
你也知道,我们有customerfuture 和 shopfuture。那么routefuture包装它们然后“等待”它们完成。当他们准备好了,它会运行我们提供的函数来结合所有的结果(findroute())。当两个基本的futures完成并且 findroute()也完成时,这样routefuture将会完成。
九、等待所有的 completablefutures 完成
如果不是产生新的completablefuture连接这两个结果,我们只是希望当完成时得到通知,我们可以使用thenacceptboth()/runafterboth()系列的方法,(…async 变量也是可用的)。它们的工作方式与thenaccept() 和 thenrun()类似,但是是等待两个futures而不是一个:
<u> completablefuture<void> thenacceptboth(completablefuture<? extends u> other, biconsumer<? super t,? super u> block)
completablefuture<void> runafterboth(completablefuture<?> other, runnable action)
想象一下上面的例子,这不是产生新的 completablefuture,你只是想要立刻发送一些事件或刷新gui。这可以很容易地实现:thenacceptboth():
customerfuture.thenacceptboth(shopfuture, (cust, shop) -> {
final route route = findroute(cust, shop);
//refresh gui with route
});
我希望我是错的,但也许有些人会问自己一个问题:为什么我不能简单地阻塞这两个futures呢? 就像:
future<customer> customerfuture = loadcustomerdetails(123);
future<shop> shopfuture = closestshop();
findroute(customerfuture.get(), shopfuture.get());
好了,你当然可以这么做。但是最关键的一点是completablefuture是允许异步的,它是事件驱动的编程模型而不是阻塞并急切地等待着结果。所以在功能上,上面两部分代码是等价的,但后者没有必要占用一个线程来执行。
十、等待第一个 completablefuture 来完成任务
另一个有趣的事是completablefutureapi可以等待第一个(与所有相反)完成的future。当你有两个相同类型任务的结果时就显得非常方便,你只要关心响应时间就行了,没有哪个任务是优先的。api方法(…async变量也是可用的):
completablefuture<void> accepteither(completablefuture<? extends t> other, consumer<? super t> block)
completablefuture<void> runaftereither(completablefuture<?> other, runnable action)
作为一个例子,你有两个系统可以集成。一个具有较小的平均响应时间但是拥有高的标准差,另一个一般情况下较慢,但是更加容易预测。为了两全其美(性能和可预测性)你可以在同一时间调用两个系统并等着谁先完成。通常这会是第一个系统,但是在进度变得缓慢时,第二个系统就可以在可接受的时间内完成:
completablefuture<string> fast = fetchfast();
completablefuture<string> predictable = fetchpredictably();
fast.accepteither(predictable, s -> {
system.out.println("result: " + s);
});
s代表了从fetchfast()或是fetchpredictably()得到的string。我们不必知道也无需关心。
十一、完整地转换第一个系统
applytoeither()算是 accepteither()的前辈了。当两个futures快要完成时,后者只是简单地调用一些代码片段,applytoeither()将会返回一个新的future。当这两个最初的futures完成时,新的future也会完成。api有点类似于(…async 变量也是可用的):
这个额外的fn功能在第一个future被调用时能完成。我不确定这个专业化方法的目的是什么,毕竟一个人可以简单地使用:fast.applytoeither(predictable).thenapply(fn)。因为我们坚持用这个api,但我们的确不需要额外功能的应用程序,我会简单地使用function.identity()占位符:
completablefuture<string> fast = fetchfast();
completablefuture<string> predictable = fetchpredictably();
completablefuture<string> firstdone =
fast.applytoeither(predictable, function.<string>identity());
第一个完成的future可以通过运行。请注意,从客户的角度来看,两个futures实际上是在firstdone的后面而隐藏的。客户端只是等待着future来完成并且通过applytoeither()使得当最先的两个任务完成时通知客户端。
十二、多种结合的completablefuture
我们现在知道如何等待两个future来完成(使用thencombine())并第一个完成(applytoeither())。但它可以扩展到任意数量的futures吗?的确,使用static辅助方法:
static completablefuture<void< allof(completablefuture<?<... cfs)
static completablefuture<object< anyof(completablefuture<?<... cfs)
allof()当所有的潜在futures完成时,使用了一个futures数组并且返回一个future(等待所有的障碍)。另一方面anyof()将会等待最快的潜在futures,请看一下返回futures的一般类型,这不是你所期望的吗?我们会在接下来的文章中关注一下这个问题。
总结
我们探索了整个completablefuture api。我确信这样就能战无不胜了,所以在下一篇文章中我们将研究另一个简单的web爬虫程序的实现,使用completablefuture方法和java 8 lambda表达式,我们也会看看completablefuture的
上一篇: 面试题39:数组中出现次数超过一半的数字
下一篇: 面试题39:数组中出现次数超过一半的数字