Java并行计算框架Fork/Join
0.本文目录
1.开篇明志
这两天在做阿里中间件的比赛,在看并发的一些内容, 本文将总结一下自己看的Java中Fork/Join计算框架。Fork/Join框架被设计成可以容易地将算法并行化、分治化。在实际情况中,很多时候我们都需要面对经典的“分治”问题。要解决这类问题,主要任务通常被分解为多个任务块(分解阶段),其后每一小块任务被独立并行计算。一旦计算任务完成,每一快的结果会被合并或者解决(解决阶段)。
2.什么是Fork/Join计算框架?
Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
通过Fork和Join这两个单词来理解下Fork/Join框架,Fork就是把一个大任务切分为若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。比如计算1+2+。。+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和,最终汇总这10个子任务的结果。
ForkJoinTask
有两个主要的方法:
- fork () – 这个方法决定了
ForkJoinTask
的异步执行,凭借这个方法可以创建新的任务。 - join () – 该方法负责在计算完成侯返回结果,因此允许一个任务等待另一任务执行完成。
3.工作窃取(Work stealing)
Fork/Join
框架在java.util.concurrent
包中加入了两个主要的类:
- ForkJoinPool
- ForkJoinTask
ForkJoinPool
类是ForkJoinTask实例的执行者,ForkJoinPool
的主要任务就是”工作窃取”,其线程尝试发现和执行其他任务创建的子任务。
4.工作原理
- 第一、分割任务,只要任务的粒度超过阀值,就不停地将任务分拆为小任务;
- 第二、执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里。
- 第三、再启动一个线程合并结果队列的值。
Fork-Join框架涉及的主要类如下:
Fork/Join使用两个类来完成上面三步:
-
ForkJoinTask:我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制,通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下两个子类:
- RecursiveAction:用于没有返回结果的任务。
- RecursiveTask :用于有返回结果的任务,通过泛型参数设置计算的返回值类型。
-
ForkJoinPool :ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。(需要实现compute方法)。
5.使用Fork/Join
下面实现一个这样的任务:计算1+2+…+10000 累加任务。
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;
public class Main extends RecursiveTask<Integer> {
private static final int THRESHOLD = 100;
private int start;
private int end;
public Main(int start, int end){
this.start = start;
this.end = end;
}
protected Integer compute(){
int sum = 0;
boolean canComputer = (end - start) <= THRESHOLD;
//如果任务规模小于 阈值 THRESHOLD 就直接计算
if(canComputer){
for(int i = start; i <= end; i++){
sum += i;
}
}else{
//如果任务大于阈值就裂变为两个子任务
int middle = (start + end) / 2;
Main leftTask = new Main(start, middle);
Main rightTask = new Main(middle + 1, end);
//执行子任务
leftTask.fork();
rightTask.fork();
int leftResult = leftTask.join();
int rightResult = rightTask.join();
//合并子任务
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args){
ForkJoinPool pool = new ForkJoinPool();
Main task = new Main(1, 10000);
Future<Integer> result = pool.submit(task);
try{
System.out.println("result is:" + result.get());
}catch(InterruptedException e){
}catch(ExecutionException e){
}finally{
pool.shutdown();
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52