CompletableFuture
使用CompletableFuture优化你的代码执行效率
使用CompletableFuture
Java8之Consumer、Supplier、Predicate和Function攻略
1. 什么是CompletableFuture
CompletableFuture
用于异步编程,是对Future的扩展,Future
被用于作为一个异步计算结果的引用,提供一个 isDone()
方法来检查计算任务是否完成。当任务完成时,使用get()
方法用来接收计算任务的结果,但是Future
存在一些局限性
- 无法手动完成;假如把从远程api获取服务的操作放在一个单独的线程用future来接收返回结果,在主线程通过future的get方法获取返回结果,如果远程服务超时,那么就会一直阻塞在对结果的获取,而不能手动结束任务
- 对结果的获取不方便;使用
Future
获得异步执行结果时,要么调用阻塞方法get()
,要么轮询看isDone()
是否为true
,这两种方法都不是很好,因为主线程也会*等待。,为什么不能用观察者设计模式采用回调的方式当计算结果完成及时通知监听者呢 - 不能组合多个
future
的结果
为了解决上面的问题,jdk8
引入了CompletableFuture
CompletableFuture
实现了 Future
和 CompletionStage
接口,并且提供了许多关于创建,链式调用和组合多个 Future
的便利方法集,而且有广泛的异常处理支持
2. CompletableFuture
的使用
2.1 创建CompletableFuture
例子来自Java 8 CompletableFuture 教程
//有返回值
public static CompletableFuture<Void> runAsync(Runnable runnable)
//无返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
1⃣️ public static CompletableFuture<Void> runAsync(Runnable runnable)
其中runAsync
持有Runnable
对象自然适合与无返回结果的任务;
CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
@Override
public void run() {
// Simulate a long-running Job
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
System.out.println("I'll run in a separate thread than the main thread.");
}
});
// Block and wait for the future to complete
future.get()
//使用lambda简化
// Using Lambda Expression
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
// Simulate a long-running Job
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
System.out.println("I'll run in a separate thread than the main thread.");
});
2⃣️ public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
而supplyAsync
运行一个异步任务并且返回结果,它持有Supplier
对象,Supplier<T>
是一个简单的函数式接口,表示supplier
的结果,它有一个get()
方法,把需要执行的后台任务写入这个方法,并且返回结果
// Run a task specified by a Supplier object asynchronously
CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
@Override
public String get() {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Result of the asynchronous computation";
}
});
// Block and get the result of the Future
String result = future.get();
System.out.println(result);
//使用lambda简化
// Using Lambda Expression
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Result of the asynchronous computation";
});
3⃣️ public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
之前提到的两种方法都会为任务单独创建一个线程,但是在实际使用的时候,创建太多的线程是对资源的一种浪费并且创建线程也耗费时间,最好的解决方法是使用线程池CompletableFuture
API 的所有方法都有一个变体-接受Executor
作为参数
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
4⃣️ public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
Executor executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
throw new IllegalStateException(e);
}
return "Result of the asynchronous computation";
}, executor);
2.2 获取结果
CompletableFuture
实现了Future
接口,所以也实现了get()
等方法,就像上面使用的那样;
V get();
V get(long timeout,Timeout unit);
T getNow(T defaultValue);//没有结果会返回默认值
但是开头说过,get
方法会阻塞,一直等到Future
完成返回结果,除此之外Future
接口的方法还存在不能组合多个future
等问题;CompletableFuture
还实现了CompletionStage
接口,CompletionStage
接口中的方法可以解决这些问题,完成异步回调机制,多个CompletableFuture
串行执行的操作
1⃣️ 计算完成时对结果的处理 whenComplete/exceptionally/handle
当CompletableFuture
的计算结果完成,或者抛出异常的时候,我们可以执行特定的Action
public CompletableFuture whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture exceptionally(Function<Throwable,? extends T> fn)
该方法是对异常情况的处理,当函数异常时应该的返回值
whenComplete
whenComplete
参数类型 BiConsumer<? super T, ? super Throwable>
会获取上一步计算的计算结果和异常信息,以Async
结尾的方法可能会使用其它的线程去执行,如果使用相同的线程池,也可能会被同一个线程选中执行
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(100);
return 20;
}).whenCompleteAsync((v, e) -> {
System.out.println(v);
System.out.println(e);
});
System.out.println(future.get());
}
exceptionally
exceptionally
对异常情况的处理,当函数异常时应该的返回值
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(100);
return 10 / 0;
}).whenCompleteAsync((v, e) -> {
System.out.println(v);
System.out.println(e);
}).exceptionally((e) -> {
System.out.println(e.getMessage());
return 30;
});
System.out.println(future.get());
}
2⃣️ 结果处理转换thenApply
CompletableFuture
由于有回调,可以不必等待一个计算完成而阻塞着调用线程,可以在一个结果计算完成之后紧接着执行某个Action
。我们可以将这些操作串联起来
thenApply
只有当正常返回才回去执行,而whenComplete
不管是否正常都执行
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
场景:执行任务A,同时异步执行任务B,待任务B正常返回之后,用B的返回值执行任务C,任务C无返回值
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "任务A");
CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> "任务B");
CompletableFuture<String> futureC = futureB.thenApply(b -> {
System.out.println("执行任务C.");
System.out.println("参数:" + b);//参数:任务B
return "a";
});
3⃣️ thenAccept
单纯的去消费结果而不会返回新的值,因些计算结果为 Void
;
public CompletableFuture thenAccept(Consumer<? super T> action)
public CompletableFuture thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture thenAcceptAsync(Consumer<? super T> action, Executor executor)
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> future = CompletableFuture
.supplyAsync(() -> 1)
.thenAccept(System.out::println) //消费 上一级返回值 1
.thenAcceptAsync(System.out::println); //上一级没有返回值 输出null
System.out.println(future.get()); //消费函数没有返回值 输出null
}
4⃣️ thenRun
和thenAccept
相比不关心上一步的计算结果
CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "任务A");
futureA.thenRun(() -> System.out.println("执行任务B"));
5⃣️ thenCombine/thenAcceptBoth/runAfterBoth
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)
结合两个CompletionStage
的结果,进行转化后返回
CompletableFuture<Double> futurePrice = CompletableFuture.supplyAsync(() -> 100d);
CompletableFuture<Double> futureDiscount = CompletableFuture.supplyAsync(() -> 0.8);
CompletableFuture<Double> futureResult = futurePrice.thenCombine(futureDiscount, (price, discount) -> price * discount);
System.out.println("最终价格为:" + futureResult.join()); //最终价格为:80.0
thenCombine()
是结合两个任务的返回值进行转化后再返回,如果不需要返回呢,那就需要thenAcceptBoth()
,同理,如果连两个任务的返回值也不关心,则使用runAfterBoth
了
6⃣️ allOf(..) anyOf(..)
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)
allOf
:当所有的CompletableFuture
都执行完后执行计算
anyOf
:最快的那个CompletableFuture
执行完之后执行计算
7⃣️ handle
当CompletableFuture
的计算结果完成,或者抛出异常的时候,可以通过handle
方法对结果进行处理
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
本文地址:https://blog.csdn.net/weixin_43907800/article/details/107360033
推荐阅读
-
Java并发 CompletableFuture异步编程的实现
-
JAVA8给我带了什么——Optional和CompletableFuture
-
java 8 新特性功能和用法介绍03---CompletableFuture基本用法介绍
-
JUC——线程同步辅助工具类(Exchanger,CompletableFuture)
-
Java:CompletableFuture一文搞定
-
CompletableFuture
-
Java8 CompletableFuture 编程
-
编程老司机带你玩转 CompletableFuture 异步编程
-
Java8 使用工厂方法supplyAsync创建CompletableFuture实例
-
详解Java CompletableFuture使用方法以及与FutureTask的区别