欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

并发工具类(四)

程序员文章站 2022-05-04 10:53:13
...

1.写在前面

前面的几篇博客介绍了并发工具类的其中的一部分,今天是最后并发工具类是最后一篇博客,后面的博客会介绍并发的模式。好了,废话不多说,直接上内容吧。

2.CompletableFuture:异步编程

异步化的编程,利用多线程优化性能这个核心方案得以实施的基础。

2.1CompletableFuture 的核心优势

我们再来看下上篇博客中讲的烧茶泡水的例子,具体的分工图如下:

并发工具类(四)

实现的代码如下:

// 任务 1:洗水壶 -> 烧开水
CompletableFuture<Void> f1 =
	CompletableFuture.runAsync(()->{
		System.out.println("T1: 洗水壶...");
		sleep(1, TimeUnit.SECONDS);
		System.out.println("T1: 烧开水...");
		sleep(15, TimeUnit.SECONDS);
	});
// 任务 2:洗茶壶 -> 洗茶杯 -> 拿茶叶
CompletableFuture<String> f2 =
	CompletableFuture.supplyAsync(()->{
		System.out.println("T2: 洗茶壶...");
		sleep(1, TimeUnit.SECONDS);
		System.out.println("T2: 洗茶杯...");
		sleep(2, TimeUnit.SECONDS);
		System.out.println("T2: 拿茶叶...");
		sleep(1, TimeUnit.SECONDS);
		return " 龙井 ";
	});
// 任务 3:任务 1 和任务 2 完成后执行:泡茶
CompletableFuture<String> f3 =
	f1.thenCombine(f2, (__, tf)->{
		System.out.println("T1: 拿到茶叶:" + tf);
		System.out.println("T1: 泡茶...");
		return " 上茶:" + tf;
	});
// 等待任务 3 执行结果
System.out.println(f3.join());
void sleep(int t, TimeUnit u) {
	try {
    	u.sleep(t);
	}catch(InterruptedException e){}
}
// 一次执行结果:
/*T1: 洗水壶...
T2: 洗茶壶...
T1: 烧开水...
T2: 洗茶杯...
T2: 拿茶叶...
T1: 拿到茶叶: 龙井
T1: 泡茶...
上茶: 龙井*/

优势:

  1. 无需手工维护线程,没有繁琐的手工维护线程的工作,给任务分配线程的工作也不需要我们的关注。
  2. 语义更清晰,例如 f3 = f1.thenCombine(f2, ()->{}) 能够清晰地表述“任务 3要等待任务 1 和任务 2 都完成后才能开始”;
  3. 代码更简练并且专注于业务逻辑,几乎所有代码都是业务逻辑相关的。

2.2创建CompletableFuture 对象

前面的例子我们已经使用了runAsync(Runnable runnable)和supplyAsync(Supplier supplier),它们之间的区别是:Runnable 接口的
run() 方法没有返回值,而 Supplier 接口的 get() 方法是有返回值的。前两个方法和后两个方法的区别在于:后两个方法可以指定线程池参数。

// 使用默认线程池
static CompletableFuture<Void> runAsync(Runnable runnable);
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier);
// 可以指定线程池
static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor);

默认情况下 CompletableFuture 会使用公共的 ForkJoinPool 线程池。创建完 CompletableFuture 对象之后,会自动地异步执行 runnable.run() 方法或者
supplier.get() 方法,对于一个异步操作,你需要关注两个问题:一个是异步操作什么时候结束,另一个是如何获取异步操作的执行结果。因为 CompletableFuture 类实现了Future 接口,所以这两个问题你都可以通过 Future 接口来解决。另外,CompletableFuture 类还实现了 CompletionStage 接口

2.3如何理解CompletionStage接口

站在分工的角度,任务有时序的关系的,比如有串行关系,并行关系,汇聚关系。这个描述的可能抽象,我们还是看下烧水泡茶的例子。具体的如下:

并发工具类(四)

并发工具类(四)

CompletionStage接口可以清晰地描述任务之间的这种时序关系,烧水泡茶程序中的汇聚关系是一种 AND 聚合关系,这里的 AND 指的是所有依赖的任务(烧开水和拿茶叶)都完成后才开始执行当前任务(泡茶)。既然有 AND 聚合关系,那就一定还有 OR 聚合关系,所谓OR 指的是依赖的任务只要有一个完成就可以执行当前任务。

  1. 描述串行关系

    CompletionStage 接口里面描述串行关系,主要是 thenApply、thenAccept、thenRun和 thenCompose 这四个系列的接口。

    thenApply 系列函数里参数 fn 的类型是接口 Function<T, R>,这个接口里与CompletionStage 相关的方法是 R apply(T t),这个方法既能接收参数也支持返回
    值,所以 thenApply 系列方法返回的是CompletionStage。

    而 thenAccept 系列方法里参数 consumer 的类型是接口Consumer,这个接口里与CompletionStage 相关的方法是 void accept(T t),这个方法虽然支持参数,但却不支持回值,所以 thenAccept 系列方法返回的是CompletionStage。

    thenRun 系列方法里 action 的参数是 Runnable,所以 action 既不能接收参数也不支持返回值,所以 thenRun 系列方法返回的也CompletionStage。

    这些方法里面 Async 代表的是异步执行 fn、consumer 或者 action。其中,需要你注意的是 thenCompose 系列方法,这个系列的方法会新创建出一个子流程,最终结果和thenApply 系列是相同的。

    CompletionStage<R> thenApply(fn);
    CompletionStage<R> thenApplyAsync(fn);
    CompletionStage<Void> thenAccept(consumer);
    CompletionStage<Void> thenAcceptAsync(consumer);
    CompletionStage<Void> thenRun(action);
    CompletionStage<Void> thenRunAsync(action);
    CompletionStage<R> thenCompose(fn);
    CompletionStage<R> thenComposeAsync(fn);
    

    通过下面的示例代码,你可以看一下 thenApply() 方法是如何使用的。首先通过supplyAsync() 启动一个异步流程,之后是两个串行操作,整体看起来还是挺简单的。不过,虽然这是一个异步流程,但任务①②③却是串行执行的,②依赖①的执行结果,③依赖②的执行结果。

    CompletableFuture<String> f0 =
    	CompletableFuture.supplyAsync(
    		() -> "Hello World") //①
    		.thenApply(s -> s + " QQ") //②
    		.thenApply(String::toUpperCase);//③
    		System.out.println(f0.join());
    // 输出结果
    //HELLO WORLD QQ
    
  2. 描述AND汇聚关系

    CompletionStage 接口里面描述 AND 汇聚关系,主要是 thenCombine、thenAcceptBoth 和 runAfterBoth 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同。它们的使用你可以参考上面烧水泡茶的实现程序,这里就不赘述了。

    CompletionStage<R> thenCombine(other, fn);
    CompletionStage<R> thenCombineAsync(other, fn);
    CompletionStage<Void> thenAcceptBoth(other, consumer);
    CompletionStage<Void> thenAcceptBothAsync(other, consumer);
    CompletionStage<Void> runAfterBoth(other, action);
    CompletionStage<Void> runAfterBothAsync(other, action);
    
  3. 描述OR汇聚关系

    CompletionStage 接口里面描述 OR 汇聚关系,主要是 applyToEither、acceptEither 和runAfterEither 系列的接口,这些接口的区别也是源自 fn、consumer、action 这三个核心参数不同。

    CompletionStage applyToEither(other, fn);
    CompletionStage applyToEitherAsync(other, fn);
    CompletionStage acceptEither(other, consumer);
    CompletionStage acceptEitherAsync(other, consumer);
    CompletionStage runAfterEither(other, action);
    CompletionStage runAfterEitherAsync(other, action);
    

    下面的示例代码展示了如何使用 applyToEither() 方法来描述一个 OR 汇聚关系。

    CompletableFuture<String> f1 =
    	CompletableFuture.supplyAsync(()->{
    		int t = getRandom(5, 10);
    		sleep(t, TimeUnit.SECONDS);
    		return String.valueOf(t);
    	});
    CompletableFuture<String> f2 =
    	CompletableFuture.supplyAsync(()->{
    		int t = getRandom(5, 10);
    		sleep(t, TimeUnit.SECONDS);
    		return String.valueOf(t);
    	});
    CompletableFuture<String> f3 =
    	f1.applyToEither(f2,s -> s);
    System.out.println(f3.join());
    
  4. 异常处理

    虽然上面我们提到的 fn、consumer、action 它们的核心方法都不允许抛出可检查异常,但是却无法限制它们抛出运行时异常,例如下面的代码,执行 7/0 就会出现除零错误这个运行时异常。非异步编程里面,我们可以使用 try{}catch{}来捕获并处理异常,那在异步编程里面,异常该如何处理呢?

    CompletableFuture<Integer> f0 = 
    CompletableFuture.supplyAsync(()->(7/0))
    .thenApply(r->r*10);
    System.out.println(f0.join());
    

    CompletionStage 接口给我们提供的方案非常简单,比 try{}catch{}还要简单,下面是相关的方法,使用这些方法进行异常处理和串行操作是一样的,都支持链式编程方式。

    CompletionStage exceptionally(fn);
    CompletionStage<R> whenComplete(consumer);
    CompletionStage<R> whenCompleteAsync(consumer);
    CompletionStage<R> handle(fn);
    CompletionStage<R> handleAsync(fn);
    

    下面的示例代码展示了如何使用 exceptionally() 方法来处理异常,exceptionally() 的使用非常类似于 try{}catch{}中的 catch{},但是由于支持链式编程方式,所以相对更简单。既然有 try{}catch{},那就一定还有 try{}finally{},whenComplete() 和 handle() 系列方法就类似于 try{}finally{}中的 finally{},无论是否发生异常都会执行 whenComplete() 中的回调函数 consumer 和 handle() 中的回调函数 fn。whenComplete() 和 handle() 的区别在于whenComplete() 不支持返回结果,而 handle() 是支持返回结果的。

    CompletableFuture<Integer>
    f0 = CompletableFuture
    .supplyAsync(()->7/0))
    .thenApply(r->r*10)
    .exceptionally(e->0);
    System.out.println(f0.join());
    

3.CompletionService:批量执行异步任务

不久前听说小明要做一个询价应用,这个应用需要从三个电商询价,然后保存在自己的数据库里。核心示例代码如下所示,由于是串行的,所以性能很慢,你来试着优化一下吧。

// 向电商 S1 询价,并保存
r1 = getPriceByS1();
save(r1);
// 向电商 S2 询价,并保存
r2 = getPriceByS2();
save(r2);
// 向电商 S3 询价,并保存
r3 = getPriceByS3();
save(r3);

如何优化一个询价应用的核心代码?如果采用“ThreadPoolExecutor+Future”的方案,你的优化结果很可能是下面示例代码这样:用三个线程异步执行询价,通过三次调用 Future 的 get() 方法获取询价结果,之后将询价结果保存在数据库中。

// 创建线程池
ExecutorService executor =
Executors.newFixedThreadPool(3);
// 异步向电商 S1 询价
Future<Integer> f1 =
executor.submit(
()->getPriceByS1());
// 异步向电商 S2 询价
Future<Integer> f2 =
executor.submit(
()->getPriceByS2());
// 异步向电商 S3 询价
Future<Integer> f3 =
executor.submit(
()->getPriceByS3());
// 获取电商 S1 报价并保存
r=f1.get();
executor.execute(()->save(r));
// 获取电商 S2 报价并保存
r=f2.get();
executor.execute(()->save(r));
// 获取电商 S3 报价并保存
r=f3.get();
executor.execute(()->save(r));

上面的这个方案本身没有太大问题,但是有个地方的处理需要你注意,那就是如果获取电商S1 报价的耗时很长,那么即便获取电商 S2 报价的耗时很短,也无法让保存 S2 报价的操作先执行,因为这个主线程都阻塞在了 f1.get() 操作上。这点小瑕疵你该如何解决呢?

估计你已经想到了,增加一个阻塞队列,获取到 S1、S2、S3 的报价都进入阻塞队列,然后在主线程中消费阻塞队列,这样就能保证先获取到的报价先保存到数据库了。下面的示例代码展示了如何利用阻塞队列实现先获取到的报价先保存到数据库。

// 创建阻塞队列
BlockingQueue<Integer> bq =
new LinkedBlockingQueue<>();
// 电商 S1 报价异步进入阻塞队列
executor.execute(()->
bq.put(f1.get()));
// 电商 S2 报价异步进入阻塞队列
executor.execute(()->
bq.put(f2.get()));
// 电商 S3 报价异步进入阻塞队列
executor.execute(()->
bq.put(f3.get()));
// 异步保存所有报价
for (int i=0; i<3; i++) {
Integer r = bq.take();
executor.execute(()->save(r));
}

3.1利用CompletionService实现询价系统

不过在实际项目中,并不建议你这样做,因为 Java SDK 并发包里已经提供了设计精良的CompletionService。利用 CompletionService 不但能帮你解决先获取到的报价先保存到数据库的问题,而且还能让代码更简练。

CompletionService 的实现原理也是内部维护了一个阻塞队列,当任务执行结束就把任务的执行结果加入到阻塞队列中,不同的是 CompletionService 是把任务执行结果的 Future对象加入到阻塞队列中,而上面的示例代码是把任务最终的执行结果放入了阻塞队列中。

如何创建CompletionService:

CompletionService 接口的实现类是 ExecutorCompletionService,这个实现类的构造方法有两个,分别是:

  1. ExecutorCompletionService(Executor executor);
  2. ExecutorCompletionService(Executor executor,BlockingQueue<Future> completionQueue)

这两个构造方法都需要传入一个线程池,如果不指定 completionQueue,那么默认会使用*的 LinkedBlockingQueue。任务执行结果的 Future 对象就是加入到
completionQueue 中。

下面的示例代码完整地展示了如何利用 CompletionService 来实现高性能的询价系统。其中,我们没有指定 completionQueue,因此默认使用*的LinkedBlockingQueue。之后通过 CompletionService 接口提供的 submit() 方法提交了三个询价操作,这三个询价操作将会被 CompletionService 异步执行。最后,我们通过 CompletionService 接口提供的 take() 方法获取一个 Future 对象(前面我们提到过,加入到阻塞队列中的是任务执行结果的 Future 对象),调用 Future 对象的 get() 方法就能返回询价操作的执行结果了。

// 创建线程池
ExecutorService executor =
Executors.newFixedThreadPool(3);
// 创建 CompletionService
CompletionService<Integer> cs = new
ExecutorCompletionService<>(executor);
// 异步向电商 S1 询价
cs.submit(()->getPriceByS1());
// 异步向电商 S2 询价
cs.submit(()->getPriceByS2());
// 异步向电商 S3 询价
cs.submit(()->getPriceByS3());
// 将询价结果异步保存到数据库
for (int i=0; i<3; i++) {
Integer r = cs.take().get();
executor.execute(()->save(r));
}

3.2CompletionService接口说明

CompletionService接口提供的方法有5个,这5个方法的签名如下所示。

其中,submit() 相关的方法有两个。一个方法参数是Callable task,前面利用CompletionService 实现询价系统的示例代码中,我们提交任务就是用的它。另外一个方法有两个参数,分别是Runnable task和V result,这个方法类似于ThreadPoolExecutor 的 Future submit(Runnable task, T result)

CompletionService 接口其余的 3 个方法,都是和阻塞队列相关的,take()、poll() 都是从阻塞队列中获取并移除一个元素;它们的区别在于如果阻塞队列是空的,那么调用 take()方法的线程会被阻塞,而 poll() 方法会返回 null 值。 poll(long timeout, TimeUnitunit) 方法支持以超时的方式获取并移除阻塞队列头部的一个元素,如果等待了 timeout unit 时间,阻塞队列还是空的,那么该方法会返回 null 值。

Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;

3.3总结

当需要批量提交异步任务的时候建议你使用 CompletionService。CompletionService 将线程池 Executor 和阻塞队列 BlockingQueue 的功能融合在了一起,能够让批量异步任务的管理更简单。除此之外,CompletionService 能够让异步任务的执行结果有序化,先执行完的先进入阻塞队列,利用这个特性,你可以轻松实现后续处理的有序性,避免无谓的等待,同时还可以快速实现诸如 Forking Cluster 这样的需求。
CompletionService 的实现类 ExecutorCompletionService,需要你自己创建线程池,虽看上去有些啰嗦,但好处是你可以让多个 ExecutorCompletionService 的线程池隔离,这种隔离性能避免几个特别耗时的任务拖垮整个应用的风险。

4.Fork/Join:单机版的MapReduce

对于前面的学的,你可以知道对于简单的并行任务,你可以通过“线程池 +Future”的方案来解决;如果任务之间有聚合关系,无论是 AND 聚合还是 OR 聚合,都可以通过 CompletableFuture 来解决;而批量的并行任务,则可以通过 CompletionService 来解决。

我们一直讲,并发编程可以分为三个层面的问题,分别是分工、协作和互斥,当你关注于任务的时候,你会发现你的视角已经从并发编程的细节中跳出来了,你应用的更多的是现实世界的思维模式,类比的往往是现实世界里的分工,所以我把线程池、Future、CompletableFuture 和 CompletionService 都列到了分工里面。

下面我用现实世界里的工作流程图描述了并发编程领域的简单并行任务、聚合任务和批量并行任务,辅以这些流程图,相信你一定能将你的思维模式转换到现实世界里来。

并发工具类(四)

上面提到的简单并行、聚合、批量并行这三种任务模型,基本上能够覆盖日常工作中的并发场景了,但还是不够全面,因为还有一种“分治”的任务模型没有覆盖到。分治,顾名思义,即分而治之,是一种解决复杂问题的思维方法和模式;具体来讲,指的是把一个复杂的问题分解成多个相似的子问题,然后再把子问题分解成更小的子问题,直到子问题简单到可以直接求解。理论上来讲,解决每一个问题都对应着一个任务,所以对于问题的分治,实际上就是对于任务的分治。

4.1分治任务模型

分治任务模型可分为两个阶段:一个阶段是任务分解,也就是将任务的迭代地分解为子任务,直至子任务可以直接计算出结果;另一个阶段就是结果合并,即逐层合并子任务的执行结果,直至获得最终结果。下图是一个简化的分治任务模型图,你可以对照着理解。

并发工具类(四)

在这个分治任务模型里,任务和分解后的子任务具有相似性,这种相似性往往体现在任务和子任务的算法是相同的,但是计算的数据规模是不同的。具备这种相似性的问题,我们往往都采用递归算法。

4.2Fork/Join的使用

Fork/Join 是一个并行计算的框架,主要就是用来支持分治任务模型的,这个计算框架里的Fork 对应的是分治任务模型里的任务分解,Join 对应的是结果合并。Fork/Join 计算框架主要包含两部分,一部分是分治任务的线程池 ForkJoinPool,另一部分是分治任务ForkJoinTask。这两部分的关系类似于 ThreadPoolExecutor 和 Runnable 的关系,都可以理解为提交任务到线程池,只不过分治任务有自己独特类型 ForkJoinTask。

ForkJoinTask 是一个抽象类,它的方法有很多,最核心的是 fork() 方法和 join() 方法,其中 fork() 方法会异步地执行一个子任务,而 join() 方法则会阻塞当前线程来等待子任务的执行结果。ForkJoinTask 有两个子类——RecursiveAction 和 RecursiveTask,通过名字你就应该能知道,它们都是用递归的方式来处理分治任务的。这两个子类都定义了抽象方法compute(),不过区别是 RecursiveAction 定义的 compute() 没有返回值,而RecursiveTask 定义的 compute() 方法是有返回值的。这两个子类也是抽象类,在使用的时候,需要你定义子类去扩展。

接下来我们就来实现一下,看看如何用 Fork/Join 这个并行计算框架计算斐波那契数列(下面的代码源自 Java 官方示例)。首先我们需要创建一个分治任务线程池以及计算斐波那契数列的分治任务,之后通过调用分治任务线程池的 invoke() 方法来启动分治任务。由于计算斐波那契数列需要有返回值,所以 Fibonacci 继承自 RecursiveTask。分治任务Fibonacci 需要实现 compute() 方法,这个方法里面的逻辑和普通计算斐波那契数列非常类似,区别之处在于计算 Fibonacci(n - 1) 使用了异步子任务,这是通过 f1.fork()这条语句实现的。

static void main(String[] args){
	// 创建分治任务线程池
	ForkJoinPool fjp = new ForkJoinPool(4);
	// 创建分治任务
	Fibonacci fib = new Fibonacci(30);
	// 启动分治任务
	Integer result = fjp.invoke(fib);
	// 输出结果
	System.out.println(result);
}
// 递归任务
static class Fibonacci extends RecursiveTask<Integer>{
	final int n;
	Fibonacci(int n){this.n = n;}
	protected Integer compute(){
		if (n <= 1)
		return n;
		Fibonacci f1 = new Fibonacci(n - 1);
		// 创建子任务
		f1.fork();
		Fibonacci f2 = new Fibonacci(n - 2);
    	// 等待子任务结果,并合并结果
		return f2.compute() + f1.join();
	}
}

4.3ForkJoinPool工作原理

Fork/Join 并行计算的核心组件是 ForkJoinPool,所以下面我们就来简单介绍一下ForkJoinPool 的工作原理。

你应该已经知道 ThreadPoolExecutor 本质上是一个生产者 -消费者模式的实现,内部有一个任务队列,这个任务队列是生产者和消费者通信的媒介;
ThreadPoolExecutor 可以有多个工作线程,但是这些工作线程都共享一个任务队列。

ForkJoinPool 本质上也是一个生产者 - 消费者的实现,但是更加智能,你可以参考下面的ForkJoinPool 工作原理图来理解其原理。ThreadPoolExecutor 内部只有一个任务队列,而 ForkJoinPool 内部有多个任务队列,当我们通过 ForkJoinPool 的 invoke() 或者submit() 方法提交任务时,ForkJoinPool 根据一定的路由规则把任务提交到一个任务队列中,如果任务在执行过程中会创建出子任务,那么子任务会提交到工作线程对应的任务队列中。

如果工作线程对应的任务队列空了,是不是就没活儿干了呢?不是的,ForkJoinPool 支持一种叫做“任务窃取”的机制,如果工作线程空闲了,那它可以“窃取”其他工作任务队列里的任务,例如下图中,线程 T2 对应的任务队列已经空了,它可以“窃取”线程 T1 对应的任务队列的任务。如此一来,所有的工作线程都不会闲下来了。

ForkJoinPool 中的任务队列采用的是双端队列,工作线程正常获取任务和“窃取任务”分别是从任务队列不同的端消费,这样能避免很多不必要的数据竞争。我们这里介绍的仅仅是简化后的原理,ForkJoinPool 的实现远比我们这里介绍的复杂,如果你感兴趣,建议去看它的源码。

并发工具类(四)

4.4模拟MapReduce统计单次数量

我们可以先用二分法递归地将一个文件拆分成更小的文件,直到文件里只有一行数据,然后统计这一行数据里单词的数量,最后再逐级汇总结果,你可以对照前面的简版分治任务模型图来理解这个过程。 具体的代码如下:

static void main(String[] args){
	String[] fc = {"hello world",
		"hello me",
		"hello fork",
		"hello join",
		"fork join in world"};
	// 创建 ForkJoin 线程池
	ForkJoinPool fjp = new ForkJoinPool(3);
	// 创建任务
	MR mr = new MR(fc, 0, fc.length);
	// 启动任务
	Map<String, Long> result = fjp.invoke(mr);
	// 输出结果
	result.forEach((k, v)->
		System.out.println(k+":"+v));
}
//MR 模拟类
static class MR extends RecursiveTask<Map<String, Long>> {
	private String[] fc;
	private int start, end;
	// 构造函数
	MR(String[] fc, int fr, int to){
		this.fc = fc;
		this.start = fr;
		this.end = to;
	}
	@Override 
    protected Map<String, Long> compute(){
		if (end - start == 1) {
			return calc(fc[start]);
		} else {
			int mid = (start+end)/2;
			MR mr1 = new MR(fc, start, mid);
			mr1.fork();
			MR mr2 = new MR(fc, mid, end);
			// 计算子任务,并返回合并的结果
			return merge(mr2.compute(),mr1.join());
        }
	}
	// 合并结果
	private Map<String, Long> merge(Map<String, Long> r1,Map<String, Long> r2) {
		Map<String, Long> result = new HashMap<>();
		result.putAll(r1);
		// 合并结果
		r2.forEach((k, v) -> {
			Long c = result.get(k);
			if (c != null)
    			result.put(k, c+v);
			else
				result.put(k, v);
		});
		return result;
	}
	// 统计单词数量
	private Map<String, Long> calc(String line) {
		Map<String, Long> result = new HashMap<>();
		// 分割单词
		String [] words = line.split("\\s+");
		// 统计单词数量
		for (String w : words) {
			Long v = result.get(w);
			if (v != null)
				result.put(w, v+1);
			else
				result.put(w, 1L);
		}
		return result;
	}
}

4.5总结

Fork/Join 并行计算框架主要解决的是分治任务。分治的核心思想是“分而治之”:将一个大的任务拆分成小的子任务去解决,然后再把子任务的结果聚合起来从而得到最终结果。这个过程非常类似于大数据处理中的 MapReduce,所以你可以把 Fork/Join 看作单机版的MapReduce。

Fork/Join 并行计算框架的核心组件是 ForkJoinPool。ForkJoinPool 支持任务窃取机制,能够让所有线程的工作量基本均衡,不会出现有的线程很忙,而有的线程很闲的状况,所以性能很好。Java 1.8 提供的 Stream API 里面并行流也是以 ForkJoinPool 为基础的。不过需要你注意的是,默认情况下所有的并行流计算都共享一个 ForkJoinPool,这个共享的ForkJoinPool 默认的线程数是 CPU 的核数;如果所有的并行流计算都是 CPU 密集型计算的话,完全没有问题,但是如果存在 I/O 密集型的并行流计算,那么很可能会因为一个很慢的 I/O 计算而拖慢整个系统的性能。所以建议用不同的 ForkJoinPool 执行不同类型的计算任务。

5.写在最后

整个并发的工具类都已经讲完了,到此就解说了,后面的博客,笔者会讲一些并发中的经典的模式。

相关标签: 并发 java