欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

CompletableFuture

程序员文章站 2022-04-15 23:38:33
使用CompletableFuture优化你的代码执行效率使用CompletableFutureJava8之Consumer、Supplier、Predicate和Function攻略1. 什么是CompletableFutureCompletableFuture用于异步编程,是对Future的扩展,Future被用于作为一个异步计算结果的引用,提供一个 isDone() 方法来检查计算任务是否完成。当任务完成时,使用get() 方法用来接收计算任务的结果,但是Future存在一些局限性无法手动...

使用CompletableFuture优化你的代码执行效率
使用CompletableFuture
Java8之Consumer、Supplier、Predicate和Function攻略

1. 什么是CompletableFuture

CompletableFuture用于异步编程,是对Future的扩展,Future被用于作为一个异步计算结果的引用,提供一个 isDone() 方法来检查计算任务是否完成。当任务完成时,使用get() 方法用来接收计算任务的结果,但是Future存在一些局限性

  • 无法手动完成;假如把从远程api获取服务的操作放在一个单独的线程用future来接收返回结果,在主线程通过future的get方法获取返回结果,如果远程服务超时,那么就会一直阻塞在对结果的获取,而不能手动结束任务
  • 对结果的获取不方便;使用Future获得异步执行结果时,要么调用阻塞方法get(),要么轮询看isDone()是否为true,这两种方法都不是很好,因为主线程也会*等待。,为什么不能用观察者设计模式采用回调的方式当计算结果完成及时通知监听者呢
  • 不能组合多个future的结果

为了解决上面的问题,jdk8引入了CompletableFuture

CompletableFuture 实现了 FutureCompletionStage接口,并且提供了许多关于创建,链式调用和组合多个 Future 的便利方法集,而且有广泛的异常处理支持
CompletableFuture

2. CompletableFuture的使用

2.1 创建CompletableFuture

例子来自Java 8 CompletableFuture 教程

//有返回值
public static CompletableFuture<Void> runAsync(Runnable runnable)
//无返回值
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
1⃣️ public static CompletableFuture<Void> runAsync(Runnable runnable)

其中runAsync持有Runnable对象自然适合与无返回结果的任务;

CompletableFuture<Void> future = CompletableFuture.runAsync(new Runnable() {
    @Override
    public void run() {
        // Simulate a long-running Job
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        System.out.println("I'll run in a separate thread than the main thread.");
    }
});

// Block and wait for the future to complete
future.get()

//使用lambda简化
// Using Lambda Expression
CompletableFuture<Void> future = CompletableFuture.runAsync(() -> {
    // Simulate a long-running Job   
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    System.out.println("I'll run in a separate thread than the main thread.");
});

2⃣️ public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)

supplyAsync运行一个异步任务并且返回结果,它持有Supplier对象,Supplier<T> 是一个简单的函数式接口,表示supplier的结果,它有一个get()方法,把需要执行的后台任务写入这个方法,并且返回结果

// Run a task specified by a Supplier object asynchronously
CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
    @Override
    public String get() {
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            throw new IllegalStateException(e);
        }
        return "Result of the asynchronous computation";
    }
});

// Block and get the result of the Future
String result = future.get();
System.out.println(result);

//使用lambda简化
// Using Lambda Expression
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    return "Result of the asynchronous computation";
});
3⃣️ public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

之前提到的两种方法都会为任务单独创建一个线程,但是在实际使用的时候,创建太多的线程是对资源的一种浪费并且创建线程也耗费时间,最好的解决方法是使用线程池
CompletableFuture API 的所有方法都有一个变体-接受Executor作为参数

public static CompletableFuture<Void> 	runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> 	supplyAsync(Supplier<U> supplier, Executor executor)
4⃣️ public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
Executor executor = Executors.newFixedThreadPool(10);
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
    try {
        TimeUnit.SECONDS.sleep(1);
    } catch (InterruptedException e) {
        throw new IllegalStateException(e);
    }
    return "Result of the asynchronous computation";
}, executor);

2.2 获取结果

CompletableFuture实现了Future接口,所以也实现了get()等方法,就像上面使用的那样;

V get();
V get(long timeout,Timeout unit);
T getNow(T defaultValue);//没有结果会返回默认值

但是开头说过,get方法会阻塞,一直等到Future完成返回结果,除此之外Future接口的方法还存在不能组合多个future等问题;
CompletableFuture还实现了CompletionStage接口,CompletionStage接口中的方法可以解决这些问题,完成异步回调机制,多个CompletableFuture串行执行的操作

1⃣️ 计算完成时对结果的处理 whenComplete/exceptionally/handle

CompletableFuture的计算结果完成,或者抛出异常的时候,我们可以执行特定的Action

public CompletableFuture 	whenComplete(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture 	whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
public CompletableFuture 	whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)
public CompletableFuture exceptionally(Function<Throwable,? extends T> fn)
该方法是对异常情况的处理,当函数异常时应该的返回值

whenComplete

whenComplete参数类型 BiConsumer<? super T, ? super Throwable> 会获取上一步计算的计算结果和异常信息,以Async结尾的方法可能会使用其它的线程去执行,如果使用相同的线程池,也可能会被同一个线程选中执行

public static void main(String[] args) throws Exception {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
        ThreadUtil.sleep(100);
        return 20;
    }).whenCompleteAsync((v, e) -> {
        System.out.println(v);
        System.out.println(e);
    });
    System.out.println(future.get());
}

exceptionally
exceptionally对异常情况的处理,当函数异常时应该的返回值

public static void main(String[] args) throws Exception {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
        ThreadUtil.sleep(100);
        return 10 / 0;
    }).whenCompleteAsync((v, e) -> {
        System.out.println(v);
        System.out.println(e);
    }).exceptionally((e) -> {
        System.out.println(e.getMessage());
        return 30;
    });
    System.out.println(future.get());
}

2⃣️ 结果处理转换thenApply

CompletableFuture 由于有回调,可以不必等待一个计算完成而阻塞着调用线程,可以在一个结果计算完成之后紧接着执行某个Action。我们可以将这些操作串联起来

thenApply 只有当正常返回才回去执行,而whenComplete不管是否正常都执行

public <U> CompletableFuture<U> 	thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> 	thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> 	thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)

场景:执行任务A,同时异步执行任务B,待任务B正常返回之后,用B的返回值执行任务C,任务C无返回值

CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "任务A");
CompletableFuture<String> futureB = CompletableFuture.supplyAsync(() -> "任务B");
CompletableFuture<String> futureC = futureB.thenApply(b -> {
      System.out.println("执行任务C.");
      System.out.println("参数:" + b);//参数:任务B
      return "a";
});

3⃣️ thenAccept

单纯的去消费结果而不会返回新的值,因些计算结果为 Void;

public CompletableFuture 	thenAccept(Consumer<? super T> action)
public CompletableFuture 	thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture 	thenAcceptAsync(Consumer<? super T> action, Executor executor)
public static void main(String[] args) throws ExecutionException, InterruptedException {
    
    CompletableFuture<Void> future = CompletableFuture
            .supplyAsync(() -> 1)
            .thenAccept(System.out::println) //消费 上一级返回值 1
            .thenAcceptAsync(System.out::println); //上一级没有返回值 输出null
            
    System.out.println(future.get()); //消费函数没有返回值 输出null
}
4⃣️ thenRun

thenAccept相比不关心上一步的计算结果

CompletableFuture<String> futureA = CompletableFuture.supplyAsync(() -> "任务A");
futureA.thenRun(() -> System.out.println("执行任务B"));
5⃣️ thenCombine/thenAcceptBoth/runAfterBoth
public <U,V> CompletableFuture<V>     thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V>     thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V>     thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)

结合两个CompletionStage的结果,进行转化后返回

CompletableFuture<Double> futurePrice = CompletableFuture.supplyAsync(() -> 100d);
CompletableFuture<Double> futureDiscount = CompletableFuture.supplyAsync(() -> 0.8);
CompletableFuture<Double> futureResult = futurePrice.thenCombine(futureDiscount, (price, discount) -> price * discount);
System.out.println("最终价格为:" + futureResult.join()); //最终价格为:80.0

thenCombine()是结合两个任务的返回值进行转化后再返回,如果不需要返回呢,那就需要thenAcceptBoth(),同理,如果连两个任务的返回值也不关心,则使用runAfterBoth

6⃣️ allOf(..) anyOf(..)
public static CompletableFuture<Void>  allOf(CompletableFuture<?>... cfs)
public static CompletableFuture<Object>  anyOf(CompletableFuture<?>... cfs)

allOf:当所有的CompletableFuture都执行完后执行计算

anyOf:最快的那个CompletableFuture执行完之后执行计算

7⃣️ handle

CompletableFuture的计算结果完成,或者抛出异常的时候,可以通过handle方法对结果进行处理

public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn);
public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);

本文地址:https://blog.csdn.net/weixin_43907800/article/details/107360033