有了 CompletableFuture,使得异步编程没有那么难了!
本文导读:
- 业务需求场景介绍
- 技术设计方案思考
- future 设计模式实战
- completablefuture 模式实战
- completablefuture 生产建议
- completablefuture 性能测试
- completablefuture 使用扩展
1、业务需求场景介绍
不变的东西就是一直在变化中。
想必,大家在闲暇时刻,会经常看视频,经常用的几个 app,比如优酷、爱奇艺、腾讯等。
这些视频 app 不仅仅可以在手机上播放,还能够支持在电视上播放。
在电视终端上播放的 app 是独立发布的版本,跟手机端的 app 是不一样的。
当我们看一部电影时,点击进入某一部电影,就进入到了专辑详情页页面,此时,播放器会自动播放视频。用户在手机上看到的专辑详情页,与电视上看到的专辑详情页,页面样式设计上是不同的。
我们来直观的看一下效果。
手机上的腾讯视频专辑详情页:
上半部分截图,下面还有为你推荐、明星演员、周边推荐、评论等功能。
相应的,在电视端的专辑详情页展示方式是不一样的。假设产品经理提出一个需求,要求对详情页做个改版。
样式要求如下图所示:
两个终端的样式对比,在电视端专辑详情页中,包含了很多板块,每个板块横向展示多个内容。
产品的设计上要求是,有的板块内容来源于推荐、有的板块来源于搜索、有的板块来源cms(内容管理系统)。简单理解为,每个板块内容来源不同,来源于推荐、搜索等接口的内容要求是近实时的请求。
2、技术设计方案思考
考虑到产品提的这个需求,其实实现起来并不难。
主要分为了静态数据部分和动态数据部分,对于不经常变化的数据可以通过静态接口获取,对于近乎实时的数据可以通过动态接口获取。
静态接口设计:
专辑本身的属性以及专辑下的视频数据,一般是不经常变化的。
在需求场景介绍中,我截图的是电影频道。如果是电视剧频道,会展示剧集列表(专辑下的所有视频,如第 1 集、第 2 集...),而视频的更新一般是不太频繁的,所以在专辑详情页剧集列表数据就可以从静态接口获取。
静态接口数据生成流程:
另外一部分,就是需要动态接口来实现,调用第三方接口获取数据,比如推荐、搜索数据。
同时,要求板块与板块之间的内容不允许重复。
动态接口设计:
方案一:
串行调用,即按照每个板块的展示先后顺序,调用相应的第三方接口获取数据。
方案二:
并行调用,即多个板块之间可以并行调用,提高整体接口响应效率。
其实以上两个方案,各有利弊。
方案一串行调用,好处是开发模型简单,按照串行方式依次调用接口,内容数据去重,聚合所有的数据返回给客户端。
但是,接口响应时间依赖于第三方接口的响应时间,通常第三方接口总是不可靠的,可能就会拉高接口整体响应时间,进而导致占用线程时间过长,影响接口整体吞吐量。
方案二并行调用,理论上是可以提高接口的整体响应时间,假设同时调用多个第三方接口,取决于最慢的接口响应时间。
并行调用时,需要考虑到「池化技术」,即不能无限制的在 jvm 进程上创建过多的线程。同时,也要考虑到板块与板块之间的内容数据,要按照产品设计上的先后顺序做去重。
根据这个需求场景,我们选择第二种方案来实现更合适一些。
选择了方案二,我们抽象出如下图所示的简易模型:
t1、t2、t3 表示多个板块内容线程。t1 线程先返回结果,t2 线程返回的结果不能与与 t1 线程返回的结果内容重复,t3 线程返回的结果不能与 t1、t2 两个线程返回的结果内容重复。
我们从技术实现上考量,当并行调用多个第三方接口时,需要获取接口的返回结果,首先想到的就是 future ,能够实现异步获取任务结果。
另外,jdk8 提供了 completablefuture 易于使用的获取异步结果的工具类,解决了 future 的一些使用上的痛点,以更优雅的方式实现组合式异步编程,同时也契合函数式编程。
3、future 设计模式实战
future 接口设计:
提供了获取任务结果、取消任务、判断任务状态接口。调用获取任务结果方法,在任务未完成情况下,会导致调用阻塞。
future 接口提供的方法:
```
// 获取任务结果
v get() throws interruptedexception, executionexception;
// 支持超时时间的获取任务结果
v get(long timeout, timeunit unit)
throws interruptedexception, executionexception, timeoutexception;
// 判断任务是否已完成
boolean isdone();
// 判断任务是否已取消
boolean iscancelled();
// 取消任务
boolean cancel(boolean mayinterruptifrunning);
```
通常,我们在考虑到使用 future 获取任务结果时,会使用 threadpoolexecutor 或者 futuretask 来实现功能需求。
threadpoolexecutor、futuretask 与 future 接口关系类图:
theadpoolexecutor 提供三个 submit 方法:
// 1. 提交无需返回值的任务,runnable 接口 run() 方法无返回值 public future<?> submit(runnable task) { } // 2. 提交需要返回值的任务,callable 接口 call() 方法有返回值 public <t> future<t> submit(callable<t> task) { } // 3. 提交需要返回值的任务,任务结果是第二个参数 result 对象 public <t> future<t> submit(runnable task, t result) { }
第 3 个 submit 方法使用示例如下所示:
static string x = "东升的思考"; public static void main(string[] args) throws exception { executorservice executor = executors.newfixedthreadpool(1); // 创建 result 对象 r result r = new result(); r.setname(x); // 提交任务 future<result> future = executor.submit(new task(r), r); result fr = future.get(); // 下面等式成立 system.out.println(fr == r); system.out.println(fr.getname() == x); system.out.println(fr.getnick() == x); } static class result { private string name; private string nick; // ... ignore getter and setter } static class task implements runnable { result r; // 通过构造函数传入 result task(result r) { this.r = r; } @override public void run() { // 可以操作 result string name = r.getname(); r.setnick(name); } }
执行结果都是true。
futuretask 设计实现:
实现了 runnable 和 future 两个接口。实现了 runnable 接口,说明可以作为任务对象,直接提交给 threadpoolexecutor 去执行。实现了 future 接口,说明能够获取执行任务的返回结果。
我们来根据产品的需求,使用 futuretask 模拟两个线程,通过示例实现下功能。
结合示例代码注释理解:
public static void main(string[] args) throws exception { // 创建任务 t1 的 futuretask,调用推荐接口获取数据 futuretask<string> ft1 = new futuretask<>(new t1task()); // 创建任务 t1 的 futuretask,调用搜索接口获取数据,依赖 t1 结果 futuretask<string> ft2 = new futuretask<>(new t2task(ft1)); // 线程 t1 执行任务 ft1 thread t1 = new thread(ft1); t1.start(); // 线程 t2 执行任务 ft2 thread t2 = new thread(ft2); t2.start(); // 等待线程 t2 执行结果 system.out.println(ft2.get()); } // t1task 调用推荐接口获取数据 static class t1task implements callable<string> { @override public string call() throws exception { system.out.println("t1: 调用推荐接口获取数据..."); timeunit.seconds.sleep(1); system.out.println("t1: 得到推荐接口数据..."); timeunit.seconds.sleep(10); return " [t1 板块数据] "; } } // t2task 调用搜索接口数据,同时需要推荐接口数据 static class t2task implements callable<string> { futuretask<string> ft1; // t2 任务需要 t1 任务的 futuretask 返回结果去重 t2task(futuretask<string> ft1) { this.ft1 = ft1; } @override public string call() throws exception { system.out.println("t2: 调用搜索接口获取数据..."); timeunit.seconds.sleep(1); system.out.println("t2: 得到搜索接口的数据..."); timeunit.seconds.sleep(5); // 获取 t2 线程的数据 system.out.println("t2: 调用 t1.get() 接口获取推荐数据"); string tf1 = ft1.get(); system.out.println("t2: 获取到推荐接口数据:" + tf1); system.out.println("t2: 将 t1 与 t2 板块数据做去重处理"); return "[t1 和 t2 板块数据聚合结果]"; } }
执行结果如下:
> task :futuretasktest.main() t1: 调用推荐接口获取数据... t2: 调用搜索接口获取数据... t1: 得到推荐接口数据... t2: 得到搜索接口的数据... t2: 调用 t1.get() 接口获取推荐数据 t2: 获取到推荐接口数据: [t1 板块数据] t2: 将 t1 与 t2 板块数据做去重处理 [t1 和 t2 板块数据聚合结果]
小结:
future 表示「未来」的意思,主要是将耗时的一些操作任务,交给单独的线程去执行。从而达到异步的目的,提交任务的当前线程,在提交任务后和获取任务结果的过程中,当前线程可以继续执行其他操作,不需要在那傻等着返回执行结果。
4、completeablefuture 模式实战
对于 future 设计模式,虽然我们提交任务时,不会进入任何阻塞,但是当调用方要获得这个任务的执行结果,还是可能会阻塞直至任务执行完成。
在 jdk1.5 设计之初就一直存在这个问题,发展到 jdk1.8 引入了 completablefuture 才得到完美的增强。
在此期间,google 开源的 guava 工具包提供了 listenablefuture ,用于支持任务完成时支持回调方式,感兴趣的朋友们可以自行查阅研究。
在业务需求场景介绍中,不同板块的数据来源是不同的,并且板块与板块之间是存在数据依赖关系的。
可以理解为任务与任务之间是有时序关系的,而根据 completablefuture 提供的一些功能特性,是非常适合这种业务场景的。
completablefuture 类图:
completablefuture 实现了 future 和 completionstage 两个接口。实现 future 接口是为了关注异步任务什么时候结束,和获取异步任务执行的结果。实现 completionstage 接口,其提供了非常丰富的功能,实现了串行关系、并行关系、汇聚关系等。
completablefuture 核心优势:
1)无需手工维护线程,给任务分配线程的工作无需开发人员关注;
2)在使用上,语义更加清晰明确;
例如:t3 = t1.thencombine(t2, () -> { // dosomething ... } 能够明确的表述任务 3 要等任务 2 和 任务 1完成后才会开始执行。
3)代码更加简练,支持链式调用,让你更专注业务逻辑。
4)方便的处理异常情况
接下来,通过 completablefuture 来模拟实现专辑下多板块数据聚合处理。
代码如下所示:
public static void main(string[] args) throws exception { // 暂存数据 list<string> stashlist = lists.newarraylist(); // 任务 1:调用推荐接口获取数据 completablefuture<string> t1 = completablefuture.supplyasync(() -> { system.out.println("t1: 获取推荐接口数据..."); sleepseconds(5); stashlist.add("[t1 板块数据]"); return "[t1 板块数据]"; }); // 任务 2:调用搜索接口获取数据 completablefuture<string> t2 = completablefuture.supplyasync(() -> { system.out.println("t2: 调用搜索接口获取数据..."); sleepseconds(3); return " [t2 板块数据] "; }); // 任务 3:任务 1 和任务 2 完成后执行,聚合结果 completablefuture<string> t3 = t1.thencombine(t2, (t1result, t2result) -> { system.out.println(t1result + " 与 " + t2result + "实现去重逻辑处理"); return "[t1 和 t2 板块数据聚合结果]"; }); // 等待任务 3 执行结果 system.out.println(t3.get(6, timeunit.seconds)); } static void sleepseconds(int timeout) { try { timeunit.seconds.sleep(timeout); } catch (interruptedexception e) { e.printstacktrace(); } }
执行结果如下:
> task :completablefuturetest.main() t1: 获取推荐接口数据... t2: 调用搜索接口获取数据... [t1 板块数据] 与 [t2 板块数据] 实现去重逻辑处理 [t1 和 t2 板块数据聚合结果]
上述的示例代码在 idea 中新建个class,直接复制进去,即可正常运行。
** 5、completablefuture 生产建议**
创建合理的线程池:
在生产环境下,不建议直接使用上述示例代码形式。因为示例代码中使用的completablefuture.supplyasync(() -> {});
创建 completablefuture 对象的 supplyasync() 方法(这里使用的工厂方法模式),底层使用的默认线程池,不一定能满足业务需求。
结合底层源代码来看一下:
// 默认使用 forkjoinpool 线程池 private static final executor asyncpool = usecommonpool ? forkjoinpool.commonpool() : new threadpertaskexecutor(); public static <u> completablefuture<u> supplyasync(supplier<u> supplier) { return asyncsupplystage(asyncpool, supplier); }
创建 forkjoinpool 线程池:
默认线程池大小是 runtime.getruntime().availableprocessors() - 1(cpu 核数 - 1),可以通过 jvm 参数 -djava.util.concurrent.forkjoinpool.common.parallelism 设置线程池大小。
jvm 参数上配置 -djava.util.concurrent.forkjoinpool.common.threadfactory 设置线程工厂类;配置 -djava.util.concurrent.forkjoinpool.common.exceptionhandler 设置异常处理类,这两个参数设置后,内部会通过系统类加载器加载 class。
如果所有 completablefuture 都使用默认线程池,一旦有任务执行很慢的 i/o 操作,就会导致所有线程都阻塞在 i/o 操作上,进而影响系统整体性能。
所以,建议大家在生产环境使用时,根据不同的业务类型创建不同的线程池,以避免互相影响
。
completablefuture 还提供了另外支持线程池的方法。
// 第二个参数支持传递 executor 自定义线程池 public static <u> completablefuture<u> supplyasync(supplier<u> supplier, executor executor) { return asyncsupplystage(screenexecutor(executor), supplier); }
自定义线程池,建议参考 「阿里巴巴 java 开发手册」,推荐使用 threadpoolexecutor 自定义线程池,使用有界队列,根据实际业务情况设置队列大小。
线程池大小的设置,在 「java 并发编程实战」一书中,brian goetz 提供了不少优化建议。如果线程池数量过多,竞争 cpu 和内存资源,导致大量时间在上下文切换上。反之,如果线程池数量过少,无法充分利用 cpu 多核优势。
线程池大小与 cpu 处理器的利用率之比可以用下面公式估算:
异常处理:
completablefuture 提供了非常简单的异常处理 ,如下这些方法,支持链式编程方式。
// 类似于 try{}catch{} 中的 catch{} public completionstage<t> exceptionally (function<throwable, ? extends t> fn); // 类似于 try{}finally{} 中的 finally{},不支持返回结果 public completionstage<t> whencomplete (biconsumer<? super t, ? super throwable> action); public completionstage<t> whencompleteasync (biconsumer<? super t, ? super throwable> action); // 类似于 try{}finally{} 中的 finally{},支持返回结果 public <u> completionstage<u> handle (bifunction<? super t, throwable, ? extends u> fn); public <u> completionstage<u> handleasync (bifunction<? super t, throwable, ? extends u> fn);
#### 6、completablefuture 性能测试:
循环压测任务数如下所示,每次执行压测,从 1 到 jobnum 数据叠加汇聚结果,计算耗时。
统计维度:completablefuture 默认线程池 与 自定义线程池。
性能测试代码:
// 性能测试代码 arrays.aslist(-3, -1, 0, 1, 2, 4, 5, 10, 16, 17, 30, 50, 100, 150, 200, 300).foreach(offset -> { int jobnum = processors + offset; system.out.println( string.format("when %s tasks => stream: %s, parallelstream: %s, future default: %s, future custom: %s", testcompletablefuturedefaultexecutor(jobnum), testcompletablefuturecustomexecutor(jobnum))); }); // completablefuture 使用默认 forkjoinpool 线程池 private static long testcompletablefuturedefaultexecutor(int jobcount) { list<completablefuture<integer>> tasks = new arraylist<>(); intstream.rangeclosed(1, jobcount).foreach(value -> tasks.add(completablefuture.supplyasync(completeablefutureperftest::getjob))); long start = system.currenttimemillis(); int sum = tasks.stream().map(completablefuture::join).maptoint(integer::intvalue).sum(); checksum(sum, jobcount); return system.currenttimemillis() - start; } // completablefuture 使用自定义的线程池 private static long testcompletablefuturecustomexecutor(int jobcount) { threadpoolexecutor threadpoolexecutor = new threadpoolexecutor(200, 200, 5, timeunit.minutes, new arrayblockingqueue<>(100000), new threadfactory() { @override public thread newthread(runnable r) { thread thread = new thread(r); thread.setname("custom_daemon_completablefuture"); thread.setdaemon(true); return thread; } }, new threadpoolexecutor.callerrunspolicy()); list<completablefuture<integer>> tasks = new arraylist<>(); intstream.rangeclosed(1, jobcount).foreach(value -> tasks.add(completablefuture.supplyasync(completeablefutureperftest::getjob, threadpoolexecutor))); long start = system.currenttimemillis(); int sum = tasks.stream().map(completablefuture::join).maptoint(integer::intvalue).sum(); checksum(sum, jobcount); return system.currenttimemillis() - start; }
测试机器配置:8 核cpu,16g内存
性能测试结果:
根据压测结果看到,随着压测任务数量越大,使用默认的线程池性能越差。
7、completablefuture 使用扩展:
对象创建:
除前面提到的 supplyasync 方法外,completablefuture 还提供了如下方法:
// 执行任务,completablefuture<void> 无返回值,默认线程池 public static completablefuture<void> runasync(runnable runnable) { return asyncrunstage(asyncpool, runnable); } // 执行任务,completablefuture<void> 无返回值,支持自定义线程池 public static completablefuture<void> runasync(runnable runnable, executor executor) { return asyncrunstage(screenexecutor(executor), runnable); }
我们在 completablefuture 模式实战中,提到了 completablefuture 实现了 completionstage 接口,该接口提供了非常丰富的功能。
completionstage 接口支持串行关系、汇聚 and 关系、汇聚 or 关系。
下面对这些关系的接口做个简单描述,大家在使用时可以去自行查阅 jdk api。
同时,这些关系接口中每个方法都提供了对应的 xxxasync() 方法,表示异步化执行任务。
串行关系:
completionstage 描述串行关系,主要有 thenapply、thenrun、thenaccept 和 thencompose 系列接口。
源码如下所示:
// 对应 u apply(t t) ,接收参数 t并支持返回值 u public <u> completionstage<u> thenapply(function<? super t,? extends u> fn); public <u> completionstage<u> thenapplyasync(function<? super t,? extends u> fn); // 不接收参数也不支持返回值 public completionstage<void> thenrun(runnable action); public completionstage<void> thenrunasync(runnable action); // 接收参数但不支持返回值 public completionstage<void> thenaccept(consumer<? super t> action); public completionstage<void> thenacceptasync(consumer<? super t> action); // 组合两个依赖的 completablefuture 对象 public <u> completionstage<u> thencompose (function<? super t, ? extends completionstage<u>> fn); public <u> completionstage<u> thencomposeasync (function<? super t, ? extends completionstage<u>> fn);
汇聚 and 关系:
completionstage 描述 汇聚 and 关系,主要有 thencombine、thenacceptboth 和 runafterboth 系列接口。
源码如下所示(省略了async 方法):
// 当前和另外的 completablefuture 都完成时,两个参数传递给 fn,fn 有返回值 public <u,v> completionstage<v> thencombine (completionstage<? extends u> other, bifunction<? super t,? super u,? extends v> fn); // 当前和另外的 completablefuture 都完成时,两个参数传递给 action,action 没有返回值 public <u> completionstage<void> thenacceptboth (completionstage<? extends u> other, biconsumer<? super t, ? super u> action); // 当前和另外的 completablefuture 都完成时,执行 action public completionstage<void> runafterboth(completionstage<?> other, runnable action);
汇聚 or 关系:
completionstage 描述 汇聚 or 关系,主要有 applytoeither、accepteither 和 runaftereither 系列接口。
源码如下所示(省略了async 方法):
// 当前与另外的 completablefuture 任何一个执行完成,将其传递给 fn,支持返回值 public <u> completionstage<u> applytoeither (completionstage<? extends t> other, function<? super t, u> fn); // 当前与另外的 completablefuture 任何一个执行完成,将其传递给 action,不支持返回值 public completionstage<void> accepteither (completionstage<? extends t> other, consumer<? super t> action); // 当前与另外的 completablefuture 任何一个执行完成,直接执行 action public completionstage<void> runaftereither(completionstage<?> other, runnable action);
到此,completablefuture 的相关特性都介绍完了。
异步编程慢慢变得越来越成熟,java 语言官网也开始支持异步编程模式,所以学好异步编程还是有必要的。
本文结合业务需求场景驱动,引出了 future 设计模式实战,然后对 jdk1.8 中的 completablefuture 是如何使用的,核心优势、性能测试对比、使用扩展方面做了进一步剖析。
希望对大家有所帮助!
欢迎关注我的公众号,扫二维码关注解锁更多精彩文章,与你一同成长~
上一篇: Python基础——分支、循环
下一篇: Laravel实现ORM带条件搜索分页