欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

java8-CompleableFuture的使用1

程序员文章站 2022-05-10 09:09:20
背景 1. 硬件的极速发展,多核心CPU司空见惯;分布式的软件架构司空见惯; 2. 功能API大多采用混聚的方式把基础服务的内容链接在一起,方便用户生活。 抛出了两个问题: 1. 如何发挥多核能力; 2. 切分大型任务,让每个子任务并行运行; 并发和并行的区别 |项目|区别1|实现技术| | | | ......

背景

  1. 硬件的极速发展,多核心cpu司空见惯;分布式的软件架构司空见惯;
  2. 功能api大多采用混聚的方式把基础服务的内容链接在一起,方便用户生活。

抛出了两个问题:

  1. 如何发挥多核能力;
  2. 切分大型任务,让每个子任务并行运行;

并发和并行的区别

项目 区别1 实现技术
并行 每个任务跑在单独的cpu核心上 分支合并框架,并行流
并发 不同任务共享cpu核心,基于时间片调度 completablefuture

future接口

java5开始引入。将来某个时刻发生的事情进行建模。
进行一个异步计算,返回一个执行运算的结果引用,当运算结束后,这个引用可以返回给调用方。
可以使用future把哪些潜在耗时的任务放到异步线程中,让主线程继续执行其他有价值的工作,不在白白等待。

下面是一个例子:使用future,可以让两个任务并发的运行,然后汇聚结果;

package com.test.completable;

import com.google.common.base.stopwatch;

import java.util.concurrent.executionexception;
import java.util.concurrent.executorservice;
import java.util.concurrent.executors;
import java.util.concurrent.future;
import java.util.concurrent.timeunit;
import java.util.concurrent.timeoutexception;

/**
 * 说明:future应用实例
 * @author carter
 * 创建时间: 2019年11月18日 10:53
 **/

public class futuretest {

    static final executorservice pool = executors.newfixedthreadpool(2);


    public static void main(string[] args) {
        stopwatch stopwatch = stopwatch.createstarted();

        future<long> longfuture = pool.submit(() -> dosomethinglongtime());

        dosomething2();
        try {
            final long longvalue = longfuture.get(3, timeunit.seconds);
            system.out.println(thread.currentthread().getname() + " future return value :" + longvalue + " : " + stopwatch.stop());
        } catch (interruptedexception e) {
            e.printstacktrace();
        } catch (executionexception e) {
            e.printstacktrace();
        } catch (timeoutexception e) {
            e.printstacktrace();
        }
        pool.shutdown();
    }

    private static void dosomething2() {
        stopwatch stopwatch = stopwatch.createstarted();
        try {
            timeunit.seconds.sleep(3);
        } catch (interruptedexception e) {
            e.printstacktrace();
        }

        system.out.println(thread.currentthread().getname() + " dosomething2 :" + stopwatch.stop());
    }

    private static long dosomethinglongtime() {
        stopwatch stopwatch = stopwatch.createstarted();
        try {
            timeunit.seconds.sleep(3);
        } catch (interruptedexception e) {
            e.printstacktrace();
        }

        system.out.println(thread.currentthread().getname() + " dosomethinglongtime : " + stopwatch.stop());
        return 1000l;
    }


}

没法编写简介的并发代码。描叙能力不够;比如如下场景:

  1. 将两个异步计算的结果合并为一个,这两个异步计算之间互相独立,但是第二个有依赖第一个结果。
  2. 等待future中所有的任务都完成;
  3. 仅等待future集合中最快结束的任务完成,并返回它的结果;
  4. 通过编程的方式完成一个future任务的执行;
  5. 响应future的完成事件。

基于这个缺陷,java8中引入了completablefuture 类;

实现异步api

技能点:

  1. 提供异步api;
  2. 修改同步的api为异步的api,如何使用流水线把两个任务合并为一个异步计算操作;
  3. 响应式的方式处理异步操作的完成事件;
类型 区别 是否堵塞
同步api 调用方在被调用运行的过程中等待,被调用方运行结束后返回,调用方取得返回值后继续运行 堵塞
异步api 调用方和被调用方是异步的,调用方不用等待被调用方返回结果 非堵塞
package com.test.completable;

import com.google.common.base.stopwatch;
import com.google.common.base.ticker;

import java.util.concurrent.executionexception;
import java.util.concurrent.future;
import java.util.concurrent.timeunit;

/**
 * 说明:异步调用计算价格的方法
 * @author carter
 * 创建时间: 2019年11月18日 13:32
 **/

public class test {

    public static void main(string[] args) {
        shop shop = new shop("bestshop");

        stopwatch stopwatch = stopwatch.createstarted();
        stopwatch stopwatch2 = stopwatch.createstarted();

        future<double> doublefuture = shop.getpricefuture("pizza");

        system.out.println("getpricefuture return after: " + stopwatch.stop());

        dosomethingelse();
        try{
            final double price = doublefuture.get();
            system.out.println("price is " + price + " return after: " + stopwatch2.stop());
        } catch (interruptedexception e) {
            e.printstacktrace();
        } catch (executionexception e) {
            e.printstacktrace();
        }
    }

    private static void dosomethingelse() {
        stopwatch stopwatch = stopwatch.createstarted();
        delayutil.delay();
        system.out.println("dosomethingelse " + stopwatch.stop());

    }


}

错误处理

如果计算价格的方法产生了错误,提示错误的异常会被现在在试图计算商品价格的当前线程的范围内,最终计算的异步线程会被杀死,这会导致get方法返回结果的客户端永久的被等待。

如何避免异常被掩盖, completeexceptionally会把completablefuture内发生的问题抛出去。

    private static void test2() {
        shop shop = new shop("bestshop");

        stopwatch stopwatch = stopwatch.createstarted();
        stopwatch stopwatch2 = stopwatch.createstarted();

        future<double> doublefuture = shop.getpricefutureexception("pizza");

        system.out.println("getpricefuture return after: " + stopwatch.stop());

        dosomethingelse();
        try{
            final double price = doublefuture.get();
            system.out.println("price is " + price + " return after: " + stopwatch2.stop());
        } catch (interruptedexception e) {
            e.printstacktrace();
        } catch (executionexception e) {
            e.printstacktrace();
        }
    }

方法改造:

//异步方式查询产品价格,异常抛出去
    public future<double> getpricefutureexception(string product){


        final completablefuture<double> doublecompletablefuture = new completablefuture<>();

        new thread(()->{try {
            doublecompletablefuture.complete(alculatepriceexception(product));
        }catch (exception ex){
            doublecompletablefuture.completeexceptionally(ex);
        }
        }).start();

        return doublecompletablefuture;
    }

无堵塞

即让多个线程去异步并行或者并发的执行任务,计算完之后汇聚结果;

    private static void test3(string productname) {
        stopwatch stopwatch = stopwatch.createstarted();
        final list<string> stringlist = stream.of(new shop("华强北"), new shop("益田假日广场"), new shop("香港九龙城"), new shop("京东商城"))
                .map(item -> string.format("商店:%s的商品:%s 售价是:%s", item.getname(), productname, item.getprice(productname)))
                .collect(collectors.tolist());

        system.out.println(stringlist);
        system.out.println("test3 done in  " + stopwatch.stop());


    }

    private static void test3_parallelstream(string productname) {
        stopwatch stopwatch = stopwatch.createstarted();
        final list<string> stringlist = stream.of(new shop("华强北"), new shop("益田假日广场"), new shop("香港九龙城"), new shop("京东商城"))
                .parallel()
                .map(item -> string.format("商店:%s的商品:%s 售价是:%s", item.getname(), productname, item.getprice(productname)))
                .collect(collectors.tolist());

        system.out.println(stringlist);
        system.out.println("test3_parallelstream done in  " + stopwatch.stop());


    }


    private static void test3_completablefuture(string productname) {
        stopwatch stopwatch = stopwatch.createstarted();
        final list<string> stringlist = stream.of(new shop("华强北"), new shop("益田假日广场"), new shop("香港九龙城"), new shop("京东商城"))
                .map(item ->completablefuture.supplyasync(()-> string.format("商店:%s的商品:%s 售价是:%s", item.getname(), productname, item.getprice(productname))))
                .collect(collectors.tolist())
                .stream()
                .map(completablefuture::join)
                .collect(collectors.tolist());

        system.out.println(stringlist);
        system.out.println("test3_completablefuture done in  " + stopwatch.stop());


    }



    private static void test3_completablefuture_pool(string productname) {
        stopwatch stopwatch = stopwatch.createstarted();
        final list<string> stringlist = stream.of(new shop("华强北"), new shop("益田假日广场"), new shop("香港九龙城"), new shop("京东商城"))
                .map(item ->completablefuture.supplyasync(()-> string.format("商店:%s的商品:%s 售价是:%s", item.getname(), productname, item.getprice(productname)),pool))
                .collect(collectors.tolist())
                .stream()
                .map(completablefuture::join)
                .collect(collectors.tolist());

        system.out.println(stringlist);
        system.out.println("test3_completablefuture done in  " + stopwatch.stop());


    }

代码中有一个简单的计算场景,我想查询4家商店的iphone11售价;

华强北,益田苹果店,香港九龙城,京东商城;

每一家的查询大概耗时1s;

任务处理方式 耗时 优缺点说明
顺序执行 4秒多 简单,好理解
并行流 1秒多 无法定制流内置的线程池,使用简单,改造简单
completablefuture 默认线程池 2秒多 默认线程池
completablefuture 指定线程池 1秒多 指定了线程池,可定制性更好,相比于并行流

多个异步任务的流水线操作

场景: 先计算价格,在拿到折扣,最后计算折扣价格;

    
    private static void test4(string productname) {

        stopwatch stopwatch = stopwatch.createstarted();
        final list<string> stringlist = stream.of(new shop("华强北"), new shop("益田假日广场"), new shop("香港九龙城"), new shop("京东商城"))
                .map(shop->shop.getprice_discount(productname))
                .map(quote::parse)
                .map(discount::applydiscount)
                .collect(collectors.tolist());

        system.out.println(stringlist);
        system.out.println("test4 done in  " + stopwatch.stop());


    }

    private static void test4_completablefuture(string productname) {

        stopwatch stopwatch = stopwatch.createstarted();
        final list<string> stringlist = stream.of(new shop("华强北"), new shop("益田假日广场"), new shop("香港九龙城"), new shop("京东商城"))
                .map(shop->completablefuture.supplyasync(()->shop.getprice_discount(productname),pool))
                .map(future->future.thenapply( quote::parse))
                .map(future->future.thencompose(quote -> completablefuture.supplyasync(()->discount.applydiscount(quote),pool)))
                .collect(collectors.tolist())
                .stream()
                .map(completablefuture::join)
                .collect(collectors.tolist());

        system.out.println(stringlist);
        system.out.println("test4_completablefuture done in  " + stopwatch.stop());


    }

以上是有依赖关系的两个任务的聚合,即任务2,依赖任务1的结果。使用的是thencompose方法;

接下来如果有两个任务可以异步执行,最后需要依赖着两个任务的结果计算得到最终结果,采用的是thencombine;

//两个不同的任务,最后需要汇聚结果,采用combine
    private static void test5(string productname) {

        stopwatch stopwatch = stopwatch.createstarted();


        shop shop = new shop("香港九龙");

      double pricefinal =  completablefuture.supplyasync(()->shop.getprice(productname))
                .thencombine(completablefuture.supplyasync(shop::getrate),(price, rate)->price * rate).join();


        system.out.println("test4 done in  " + stopwatch.stop());


    }

completion事件

让任务尽快结束,无需等待;
有多个服务来源,你请求多个,谁先返回,就先响应;

结果依次返回:

 //等待所有的任务执行完毕; completablefuture.allof()
    public void findpricestream(string productname){
        list<shop> shops = arrays.aslist(new shop("华强北"), new shop("益田假日广场"), new shop("香港九龙城"), new shop("京东商城"));
        final completablefuture[] completablefuturearray = shops.stream()
                .map(shop -> completablefuture.supplyasync(() -> shop.getprice_discount(productname), pool))
                .map(future -> future.thenapply(quote::parse))
                .map(future -> future.thencompose(quote -> completablefuture.supplyasync(() -> discount.applydiscount(quote), pool)))
                .map(f -> f.thenaccept(system.out::println))
                .toarray(size -> new completablefuture[size]);


        completablefuture.allof(completablefuturearray).join();

    }

多个来源获取最快的结果:

//有两个获取天气的途径,哪个快最后结果就取哪一个
    public static void getweather(){
        final object join = completablefuture.anyof(completablefuture.supplyasync(() -> a_weather()), completablefuture.supplyasync(() -> b_weather())).join();

        system.out.println(join);
    }

    private static string b_weather() {
        delayutil.delay(3);
        return "bweather";
    }

    private static string a_weather() {
        delayutil.delay(5);
        return "aweather";
    }

源码分析

可完备化的将来;completablefuture ;

先看签名:

public class completablefuture<t> implements future<t>, completionstage<t> {}

实现了futrue,completionstage接口;
这两个接口简单说明一下:

接口 关键特性
future 直接翻译为未来,标识把一个任务异步执行,需要的的时候,通过get方法获取,也可以取消cancel,此外还提供了状态查询方法,isdone, iscancled,实现类是futuretask
completionstage 直接翻译是完成的阶段,提供了函数式编程方法

可以分为如下几类方法

方法 说明
thenapply(function f) 当前阶段正常完成之后,返回一个新的阶段,新的阶段把当前阶段的结果作为参数输入;
thenconsume(consumer c), 当前阶段完成之后,结果作为参数输入,直接消费掉,得到不返回结果的完成阶段;
thenrun(runnable action), 不接受参数,只是继续执行任务,得到一个新的完成阶段;
thencombine(othercompletionstage,bifunction), 当两个完成阶段都完成的时候,执行bifunction,返回一个新的阶段;
thenacceptboth(othercompletionstage, biconsumer) 两个完成阶段都完成之后,对两个结果进行消费;
runafterboth(othercompletionstage,runable) 两个完成阶段都完成之后,执行一个动作;
applytoeither(othercompletionstage,function) 两个完成阶段的任何一个执行结束,进入函数操作,并返回一个新的阶段
accepteither(othercompletionstage,consumer) 两个完成阶段的任何一个执行结束,消费掉,返回一个空返回值的完成阶段
runaftereither(othercompletionstage,runable) 两个完成阶段的任何一个结束,执行一个动作,返回一个空返回值的完成阶段
thencompose(function) 当前阶段完成,返回值作为参数,进行函数运算,然后结果作为一个新的完成阶段
exceptionally(function) 无论当前阶段是否正常完成,消费掉异常,然后返回值作为一个新的完成阶段
whencomplete
handle 无论当前完成阶段是否正常结束,都执行一个bifunction的函数,并返回一个新结果作为一个新的完成阶段
tocompletablefuture 转换为complatablefuture

里面的实现细节后面单独成文章再讲。

小结

  1. 执行一些比较耗时的操作,尤其是依赖一个或者多个远程服务的操作,可以使用异步任务改善程序的性能,加快程序的响应速度;
  2. 使用completablefuture你可以轻松的实现异步api;
  3. completablefuture提供了异常管理机制,让主线程有机会接管子任务抛出的异常;
  4. 把同步api封装到completablefuture中,可以异步得到它的结果;
  5. 如果异步任务之间互相独立,而他们之间的某一些结果是另外一些的输入,可以把这些任务进行compose;
  6. 可以为completablefuture中的任务注册一个回调函数,当任务执行完毕之后再进行一些其它操作;
  7. 你可以决定什么时候结束程序的运行,是所有的completablefuture任务所有对象执行完毕,或者只要其中任何一个完成即可。

原创不易,转载请注明出处。