编程老司机带你玩转 CompletableFuture 异步编程
本文从实例出发,介绍 completablefuture
基本用法。不过讲的再多,不如亲自上手练习一下。所以建议各位小伙伴看完,上机练习一把,快速掌握 completablefuture
。
个人博文地址:https://sourl.cn/s5mbcm
全文摘要:
-
future
vscompletablefuture
-
completablefuture
基本用法
0x00. 前言
一些业务场景我们需要使用多线程异步执行任务,加快任务执行速度。 java 提供 runnable
future<v>
两个接口用来实现异步任务逻辑。
虽然 future<v>
可以获取任务执行结果,但是获取方式十方不变。我们不得不使用future#get
阻塞调用线程,或者使用轮询方式判断 future#isdone
任务是否结束,再获取结果。
这两种处理方式都不是很优雅,jdk8 之前并发类库没有提供相关的异步回调实现方式。没办法,我们只好借助第三方类库,如 guava
,扩展 future
,增加支持回调功能。相关代码如下:
虽然这种方式增强了 java 异步编程能力,但是还是无法解决多个异步任务需要相互依赖的场景。
举一个生活上的例子,假如我们需要出去旅游,需要完成三个任务:
- 任务一:订购航班
- 任务二:订购酒店
- 任务三:订购租车服务
很显然任务一和任务二没有相关性,可以单独执行。但是任务三必须等待任务一与任务二结束之后,才能订购租车服务。
为了使任务三时执行时能获取到任务一与任务二执行结果,我们还需要借助 countdownlatch
。
0x01. completablefuture
jdk8 之后,java 新增一个功能十分强大的类:completablefuture
。单独使用这个类就可以轻松的完成上面的需求:
大家可以先不用管
completablefuture
相关api
,下面将会具体讲解。
对比 future<v>
,completablefuture
优点在于:
- 不需要手工分配线程,jdk 自动分配
- 代码语义清晰,异步任务链式调用
- 支持编排异步任务
怎么样,是不是功能很强大?接下来抓稳了,小黑哥要发车了。
1.1 方法一览
首先来通过 ide 查看下这个类提供的方法:
稍微数一下,这个类总共有 50 多个方法,我的天。。。
不过也不要怕,小黑哥帮你们归纳好了,跟着小黑哥的节奏,带你们掌握 completablefuture
。
若图片不清晰,可以关注『程序通事』,回复:『233』,获取该思维导图
1.2 创建 completablefuture 实例
创建 completablefuture
对象实例我们可以使用如下几个方法:
第一个方法创建一个具有默认结果的 completablefuture
,这个没啥好讲。我们重点讲述下下面四个异步方法。
前两个方法 runasync
不支持返回值,而 supplyasync
可以支持返回结果。
这个两个方法默认将会使用公共的 forkjoinpool
线程池执行,这个线程池默认线程数是 cpu 的核数。
可以设置 jvm option:-djava.util.concurrent.forkjoinpool.common.parallelism 来设置 forkjoinpool 线程池的线程数
使用共享线程池将会有个弊端,一旦有任务被阻塞,将会造成其他任务没机会执行。所以强烈建议使用后两个方法,根据任务类型不同,主动创建线程池,进行资源隔离,避免互相干扰。
1.3 设置任务结果
completablefuture
提供以下方法,可以主动设置任务结果。
boolean complete(t value) boolean completeexceptionally(throwable ex)
第一个方法,主动设置 completablefuture
任务执行结果,若返回 true
,表示设置成功。如果返回 false
,设置失败,这是因为任务已经执行结束,已经有了执行结果。
示例代码如下:
// 执行异步任务 completablefuture cf = completablefuture.supplyasync(() -> { system.out.println("cf 任务执行开始"); sleep(10, timeunit.seconds); system.out.println("cf 任务执行结束"); return "楼下小黑哥"; }); // executors.newsinglethreadscheduledexecutor().execute(() -> { sleep(5, timeunit.seconds); system.out.println("主动设置 cf 任务结果"); // 设置任务结果,由于 cf 任务未执行结束,结果返回 true cf.complete("程序通事"); }); // 由于 cf 未执行结束,将会被阻塞。5 秒后,另外一个线程主动设置任务结果 system.out.println("get:" + cf.get()); // 等待 cf 任务执行结束 sleep(10, timeunit.seconds); // 由于已经设置任务结果,cf 执行结束任务结果将会被抛弃 system.out.println("get:" + cf.get()); /*** * cf 任务执行开始 * 主动设置 cf 任务结果 * get:程序通事 * cf 任务执行结束 * get:程序通事 */
这里需要注意一点,一旦 complete
设置成功,completablefuture
返回结果就不会被更改,即使后续 completablefuture
任务执行结束。
第二个方法,给 completablefuture
设置异常对象。若设置成功,如果调用 get
等方法获取结果,将会抛错。
示例代码如下:
// 执行异步任务 completablefuture cf = completablefuture.supplyasync(() -> { system.out.println("cf 任务执行开始"); sleep(10, timeunit.seconds); system.out.println("cf 任务执行结束"); return "楼下小黑哥"; }); // executors.newsinglethreadscheduledexecutor().execute(() -> { sleep(5, timeunit.seconds); system.out.println("主动设置 cf 异常"); // 设置任务结果,由于 cf 任务未执行结束,结果返回 true cf.completeexceptionally(new runtimeexception("啊,挂了")); }); // 由于 cf 未执行结束,前 5 秒将会被阻塞。后续程序抛出异常,结束 system.out.println("get:" + cf.get()); /*** * cf 任务执行开始 * 主动设置 cf 异常 * java.util.concurrent.executionexception: java.lang.runtimeexception: 啊,挂了 * ...... */
1.4 completionstage
completablefuture
分别实现两个接口 future
与 completionstage
。
future
接口大家都比较熟悉,这里主要讲讲 completionstage
。
completablefuture
大部分方法来自completionstage
接口,正是因为这个接口,completablefuture
才有如从强大功能。
想要理解 completionstage
接口,我们需要先了解任务的时序关系的。我们可以将任务时序关系分为以下几种:
- 串行执行关系
- 并行执行关系
- and 汇聚关系
- or 汇聚关系
1.5 串行执行关系
任务串行执行,下一个任务必须等待上一个任务完成才可以继续执行。
completionstage
有四组接口可以描述串行这种关系,分别为:
thenapply
方法需要传入核心参数为 function<t,r>
类型。这个类核心方法为:
r apply(t t)
所以这个接口将会把上一个任务返回结果当做入参,执行结束将会返回结果。
thenaccept
方法需要传入参数对象为 consumer<t>
类型,这个类核心方法为:
void accept(t t)
返回值 void
可以看出,这个方法不支持返回结果,但是需要将上一个任务执行结果当做参数传入。
thenrun
方法需要传入参数对象为 runnable
类型,这个类大家应该都比较熟悉,核心方法既不支持传入参数,也不会返回执行结果。
thencompose
方法作用与 thenapply
一样,只不过 thencompose
需要返回新的 completionstage
。这么理解比较抽象,可以集合代码一起理解。
方法中带有 async ,代表可以异步执行,这个系列还有重载方法,可以传入自定义的线程池,上图未展示,读者只可以自行查看 api。
最后我们通过代码展示 thenapply
使用方式:
completablefuture<string> cf = completablefuture.supplyasync(() -> "hello,楼下小黑哥")// 1 .thenapply(s -> s + "@程序通事") // 2 .thenapply(string::touppercase); // 3 system.out.println(cf.join()); // 输出结果 hello,楼下小黑哥@程序通事
这段代码比较简单,首先我们开启一个异步任务,接着串行执行后续两个任务。任务 2 需要等待任务1 执行完成,任务 3 需要等待任务 2。
上面方法,大家需要记住了
function<t,r>
,consumer<t>
,runnable
三者区别,根据场景选择使用。
1.6 and 汇聚关系
and 汇聚关系代表所有任务完成之后,才能进行下一个任务。
如上所示,只有任务 a 与任务 b 都完成之后,任务 c 才会开始执行。
completionstage
有以下接口描述这种关系。
thencombine
方法核心参数 bifunction
,作用与 function
一样,只不过 bifunction
可以接受两个参数,而 function
只能接受一个参数。
thenacceptboth
方法核心参数biconsumer
作用也与 consumer
一样,不过其需要接受两个参数。
runafterboth
方法核心参数最简单,上面已经介绍过,不再介绍。
这三组方法只能完成两个任务 and 汇聚关系,如果需要完成多个任务汇聚关系,需要使用 completablefuture#allof
,不过这里需要注意,这个方法是不支持返回任务结果。
and 汇聚关系相关示例代码,开头已经使用过了,这里再粘贴一下,方便大家理解:
1.7 or 汇聚关系
有 and 汇聚关系,当然也存在 or 汇聚关系。or 汇聚关系代表只要多个任务中任一任务完成,就可以接着接着执行下一任务。
completionstage
有以下接口描述这种关系:
前面三组接口方法传参与 and 汇聚关系一致,这里也不再详细解释了。
当然 or 汇聚关系可以使用 completablefuture#anyof
执行多个任务。
下面示例代码展示如何使用 applytoeither
完成 or 关系。
completablefuture<string> cf = completablefuture.supplyasync(() -> { sleep(5, timeunit.seconds); return "hello,楼下小黑哥"; });// 1 completablefuture<string> cf2 = cf.supplyasync(() -> { sleep(3, timeunit.seconds); return "hello,程序通事"; }); // 执行 or 关系 completablefuture<string> cf3 = cf2.applytoeither(cf, s -> s); // 输出结果,由于 cf2 只休眠 3 秒,优先执行完毕 system.out.println(cf2.join()); // 结果:hello,程序通事
1.8 异常处理
completablefuture
方法执行过程若产生异常,当调用 get
,join
获取任务结果才会抛出异常。
上面代码我们显示使用 try..catch
处理上面的异常。不过这种方式不太优雅,completionstage
提供几个方法,可以优雅处理异常。
exceptionally
使用方式类似于 try..catch
中 catch
代码块中异常处理。
whencomplete
与 handle
方法就类似于 try..catch..finanlly
中 finally
代码块。无论是否发生异常,都将会执行的。这两个方法区别在于 handle
支持返回结果。
下面示例代码展示 handle
用法:
completablefuture<integer> f0 = completablefuture.supplyasync(() -> (7 / 0)) .thenapply(r -> r * 10) .handle((integer, throwable) -> { // 如果异常存在,打印异常,并且返回默认值 if (throwable != null) { throwable.printstacktrace(); return 0; } else { // 如果 return integer; } }); system.out.println(f0.join()); /** *java.util.concurrent.completionexception: java.lang.arithmeticexception: / by zero * ..... * * 0 */
0x02. 总结
jdk8 提供 completablefuture
功能非常强大,可以编排异步任务,完成串行执行,并行执行,and 汇聚关系,or 汇聚关系。
不过这个类方法实在太多,且方法还需要传入各种函数式接口,新手刚开始使用会直接会被弄懵逼。这里帮大家在总结一下三类核心参数的作用
-
function
这类函数接口既支持接收参数,也支持返回值 -
consumer
这类接口函数只支持接受参数,不支持返回值 -
runnable
这类接口不支持接受参数,也不支持返回值
搞清楚函数参数作用以后,然后根据串行,and 汇聚关系,or 汇聚关系归纳一下相关方法,这样就比较好理解了
最后再贴一下,文章开头的思维导图,希望对你有帮助。
0x03. 帮助文档
- 极客时间-并发编程专栏
- https://colobu.com/2016/02/29/java-completablefuture
- https://www.ibm.com/developerworks/cn/java/j-cf-of-jdk8/index.html
最后说一句(求关注)
completablefuture
很早之前就有关注,本以为跟 future
一样,使用挺简单,谁知道学的时候才发现好难。各种 api 方法看的头有点大。
后来看到极客时间-『并发编程』专栏使用归纳方式分类 completablefuture
各种方法,一下子就看懂了。所这篇文章也参考这种归纳方式。
这篇文章找资料,整理一个星期,幸好今天顺利产出。
看在小黑哥写的这么辛苦的份上,点个关注吧,赏个赞呗。别下次一定啊,大哥!写文章很辛苦的,需要来点正反馈。
才疏学浅,难免会有纰漏,如果你发现了错误的地方,还请你留言给我指出来,我对其加以修改。
感谢您的阅读,我坚持原创,十分欢迎并感谢您的关注~
欢迎关注我的公众号:程序通事,获得日常干货推送。如果您对我的专题内容感兴趣,也可以关注我的博客: