欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Java并发 CompletableFuture异步编程的实现

程序员文章站 2022-06-20 12:38:40
前面我们不止一次提到,用多线程优化性能,其实不过就是将串行操作变成并行操作。如果仔细观察,你还会发现在串行转换成并行的过程中,一定会涉及到异步化,例如下面的示例代码,现在是串行的,为了...

前面我们不止一次提到,用多线程优化性能,其实不过就是将串行操作变成并行操作。如果仔细观察,你还会发现在串行转换成并行的过程中,一定会涉及到异步化,例如下面的示例代码,现在是串行的,为了提升性能,我们得把它们并行化。

// 以下两个方法都是耗时操作
dobiza();
dobizb();


//创建两个子线程去执行就可以了,两个操作已经被异步化了。
new thread(()->dobiza())
 .start();
new thread(()->dobizb())
 .start(); 

异步化,是并行方案得以实施的基础,更深入地讲其实就是:利用多线程优化性能这个核心方案得以实施的基础。java 在 1.8 版本提供了 completablefuture 来支持异步编程。

completablefuture 的核心优势

为了领略 completablefuture 异步编程的优势,这里我们用 completablefuture 重新实现前面曾提及的烧水泡茶程序。首先还是需要先完成分工方案,在下面的程序中,我们分了 3 个任务:任务 1 负责洗水壶、烧开水,任务 2 负责洗茶壶、洗茶杯和拿茶叶,任务 3 负责泡茶。其中任务 3 要等待任务 1 和任务 2 都完成后才能开始。这个分工如下图所示。

Java并发 CompletableFuture异步编程的实现

烧水泡茶分工方案

// 任务 1:洗水壶 -> 烧开水
completablefuture<void> f1 = 
 completablefuture.runasync(()->{
 system.out.println("t1: 洗水壶...");
 sleep(1, timeunit.seconds);

 system.out.println("t1: 烧开水...");
 sleep(15, timeunit.seconds);
});
// 任务 2:洗茶壶 -> 洗茶杯 -> 拿茶叶
completablefuture<string> f2 = 
 completablefuture.supplyasync(()->{
 system.out.println("t2: 洗茶壶...");
 sleep(1, timeunit.seconds);

 system.out.println("t2: 洗茶杯...");
 sleep(2, timeunit.seconds);

 system.out.println("t2: 拿茶叶...");
 sleep(1, timeunit.seconds);
 return " 龙井 ";
});
// 任务 3:任务 1 和任务 2 完成后执行:泡茶
completablefuture<string> f3 = 
 f1.thencombine(f2, (__, tf)->{
  system.out.println("t1: 拿到茶叶:" + tf);
  system.out.println("t1: 泡茶...");
  return " 上茶:" + tf;
 });
// 等待任务 3 执行结果
system.out.println(f3.join());

void sleep(int t, timeunit u) {
 try {
  u.sleep(t);
 }catch(interruptedexception e){}
}
// 一次执行结果:
t1: 洗水壶...
t2: 洗茶壶...
t1: 烧开水...
t2: 洗茶杯...
t2: 拿茶叶...
t1: 拿到茶叶: 龙井
t1: 泡茶...
上茶: 龙井

从整体上来看,我们会发现

  • 无需手工维护线程,没有繁琐的手工维护线程的工作,给任务分配线程的工作也不需要我们关注;
  • 语义更清晰,例如f3 = f1.thencombine(f2, ()->{}) 能够清晰地表述“任务 3 要等待任务 1 和任务 2 都完成后才能开始”;
  • 代码更简练并且专注于业务逻辑,几乎所有代码都是业务逻辑相关的。

领略 completablefuture 异步编程的优势之后,下面我们详细介绍 completablefuture 的使用。

创建 completablefuture 对象

创建 completablefuture 对象主要靠下面代码中展示的这 4 个静态方法,我们先看前两个。在烧水泡茶的例子中,我们已经使用了runasync(runnable runnable)supplyasync(supplier<u> supplier),它们之间的区别是:runnable 接口的 run() 方法没有返回值,而 supplier 接口的 get() 方法是有返回值的。

前两个方法和后两个方法的区别在于:后两个方法可以指定线程池参数。

默认情况下 completablefuture 会使用公共的 forkjoinpool 线程池,这个线程池默认创建的线程数是 cpu 的核数(也可以通过 jvm option:-djava.util.concurrent.forkjoinpool.common.parallelism 来设置 forkjoinpool 线程池的线程数)。如果所有 completablefuture 共享一个线程池,那么一旦有任务执行一些很慢的 i/o 操作,就会导致线程池中所有线程都阻塞在 i/o 操作上,从而造成线程饥饿,进而影响整个系统的性能。所以,强烈建议你要根据不同的业务类型创建不同的线程池,以避免互相干扰

// 使用默认线程池
static completablefuture<void> 
 runasync(runnable runnable)
static <u> completablefuture<u> 
 supplyasync(supplier<u> supplier)
// 可以指定线程池 
static completablefuture<void> 
 runasync(runnable runnable, executor executor)
static <u> completablefuture<u> 
 supplyasync(supplier<u> supplier, executor executor) 

创建完 completablefuture 对象之后,会自动地异步执行 runnable.run() 方法或者 supplier.get() 方法,对于一个异步操作,你需要关注两个问题:一个是异步操作什么时候结束,另一个是如何获取异步操作的执行结果。因为 completablefuture 类实现了 future 接口,所以这两个问题你都可以通过 future 接口来解决。另外,completablefuture 类还实现了 completionstage 接口,这个接口内容实在是太丰富了,在 1.8 版本里有 40 个方法,这些方法我们该如何理解呢?

理解 completionstage 接口

可以站在分工的角度类比一下工作流。任务是有时序关系的,比如有串行关系、并行关系、汇聚关系等。这样说可能有点抽象,这里还举前面烧水泡茶的例子,其中洗水壶和烧开水就是串行关系,洗水壶、烧开水和洗茶壶、洗茶杯这两组任务之间就是并行关系,而烧开水、拿茶叶和泡茶就是汇聚关系。

Java并发 CompletableFuture异步编程的实现

串行关系

Java并发 CompletableFuture异步编程的实现

并行关系

Java并发 CompletableFuture异步编程的实现

汇聚关系

completionstage 接口可以清晰地描述任务之间的这种时序关系,例如前面提到的
f3 = f1.thencombine(f2, ()->{}) 描述的就是一种汇聚关系。烧水泡茶程序中的汇聚关系是一种 and 聚合关系,这里的 and 指的是所有依赖的任务(烧开水和拿茶叶)都完成后才开始执行当前任务(泡茶)。既然有 and 聚合关系,那就一定还有 or 聚合关系,所谓 or 指的是依赖的任务只要有一个完成就可以执行当前任务。

最后就是异常,completionstage 接口也可以方便地描述异常处理。

下面我们就来一一介绍,completionstage 接口如何描述串行关系、and 聚合关系、or 聚合关系以及异常处理。

1. 描述串行关系

completionstage 接口里面描述串行关系,主要是 thenapply、thenaccept、thenrun 和 thencompose 这四个系列的接口。

thenapply 系列函数里参数 fn 的类型是接口 function<t, r>,这个接口里与 completionstage 相关的方法是r apply(t t),这个方法既能接收参数也支持返回值,所以 thenapply 系列方法返回的是completionstage<r>

而 thenaccept 系列方法里参数 consumer 的类型是接口consumer<t>,这个接口里与 completionstage 相关的方法是void accept(t t),这个方法虽然支持参数,但却不支持回值,所以 thenaccept 系列方法返回的是completionstage<void>

thenrun 系列方法里 action 的参数是 runnable,所以 action 既不能接收参数也不支持返回值,所以 thenrun 系列方法返回的也是completionstage<void>

这些方法里面 async 代表的是异步执行 fn、consumer 或者 action。其中,需要你注意的是 thencompose 系列方法,这个系列的方法会新创建出一个子流程,最终结果和 thenapply 系列是相同的。

completionstage<r> thenapply(fn);
completionstage<r> thenapplyasync(fn);
completionstage<void> thenaccept(consumer);
completionstage<void> thenacceptasync(consumer);
completionstage<void> thenrun(action);
completionstage<void> thenrunasync(action);
completionstage<r> thencompose(fn);
completionstage<r> thencomposeasync(fn);

通过下面的示例代码,你可以看一下 thenapply() 方法是如何使用的。首先通过 supplyasync() 启动一个异步流程,之后是两个串行操作,整体看起来还是挺简单的。不过,虽然这是一个异步流程,但任务①②③却是串行执行的,②依赖①的执行结果,③依赖②的执行结果。

completablefuture<string> f0 = 
 completablefuture.supplyasync(
  () -> "hello world")   //①
 .thenapply(s -> s + " qq") //②
 .thenapply(string::touppercase);//③

system.out.println(f0.join());
// 输出结果
hello world qq

2. 描述 and 汇聚关系

completionstage 接口里面描述 and 汇聚关系,主要是 thencombine、thenacceptboth 和 runafterboth 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同。

completionstage<r> thencombine(other, fn);
completionstage<r> thencombineasync(other, fn);
completionstage<void> thenacceptboth(other, consumer);
completionstage<void> thenacceptbothasync(other, consumer);
completionstage<void> runafterboth(other, action);
completionstage<void> runafterbothasync(other, action);

3. 描述 or 汇聚关系

completionstage 接口里面描述 or 汇聚关系,主要是 applytoeither、accepteither 和 runaftereither 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同。

completionstage applytoeither(other, fn);
completionstage applytoeitherasync(other, fn);
completionstage accepteither(other, consumer);
completionstage accepteitherasync(other, consumer);
completionstage runaftereither(other, action);
completionstage runaftereitherasync(other, action);
completablefuture<string> f1 = 
 completablefuture.supplyasync(()->{
  int t = getrandom(5, 10);
  sleep(t, timeunit.seconds);
  return string.valueof(t);
});

completablefuture<string> f2 = 
 completablefuture.supplyasync(()->{
  int t = getrandom(5, 10);
  sleep(t, timeunit.seconds);
  return string.valueof(t);
});

completablefuture<string> f3 = 
 f1.applytoeither(f2,s -> s);

system.out.println(f3.join());

4. 异常处理

虽然上面我们提到的 fn、consumer、action 它们的核心方法都不允许抛出可检查异常,但是却无法限制它们抛出运行时异常 ,例如下面的代码,执行

completablefuture<integer> 
 f0 = completablefuture.
  .supplyasync(()->(7/0))
  .thenapply(r->r*10);
system.out.println(f0.join());

completionstage 接口给我们提供的方案非常简单,比 try{}catch{}还要简单,下面是相关的方法,使用这些方法进行异常处理和串行操作是一样的,都支持链式编程方式。

completionstage exceptionally(fn);
completionstage<r> whencomplete(consumer);
completionstage<r> whencompleteasync(consumer);
completionstage<r> handle(fn);
completionstage<r> handleasync(fn);

下面的示例代码展示了如何使用 exceptionally() 方法来处理异常,exceptionally() 的使用非常类似于 try{}catch{}中的 catch{},但是由于支持链式编程方式,所以相对更简单。

whencomplete() 和 handle() 系列方法就类似于 try{}finally{}中的 finally{},无论是否发生异常都会执行 whencomplete() 中的回调函数 consumer 和 handle() 中的回调函数 fn。

whencomplete() 和 handle() 的区别在于 whencomplete() 不支持返回结果,而 handle() 是支持返回结果的。

completablefuture<integer> 
 f0 = completablefuture
  .supplyasync(()->7/0))
  .thenapply(r->r*10)
  .exceptionally(e->0);
system.out.println(f0.join());

总结

不过最近几年,伴随着 reactivex 的发展(java 语言的实现版本是 rxjava),回调地狱已经被完美解决了,java 语言也开始官方支持异步编程:在 1.8 版本提供了 completablefuture,在 java 9 版本则提供了更加完备的 flow api,异步编程目前已经完全工业化。

completablefuture 已经能够满足简单的异步编程需求,如果你对异步编程感兴趣,可以重点关注 rxjava 这个项目,利用 rxjava,即便在 java 1.6 版本也能享受异步编程的乐趣。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。