欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java并行计算框架Fork/Join

程序员文章站 2022-06-30 20:41:46
...


Java并行计算框架Fork/Join


0.本文目录

1.开篇明志

这两天在做阿里中间件的比赛,在看并发的一些内容, 本文将总结一下自己看的Java中Fork/Join计算框架。Fork/Join框架被设计成可以容易地将算法并行化、分治化。在实际情况中,很多时候我们都需要面对经典的“分治”问题。要解决这类问题,主要任务通常被分解为多个任务块(分解阶段),其后每一小块任务被独立并行计算。一旦计算任务完成,每一快的结果会被合并或者解决(解决阶段)。

2.什么是Fork/Join计算框架?

Fork/Join框架是Java7提供了的一个用于并行执行任务的框架, 是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

通过Fork和Join这两个单词来理解下Fork/Join框架,Fork就是把一个大任务切分为若干子任务并行的执行,Join就是合并这些子任务的执行结果,最后得到这个大任务的结果。比如计算1+2+。。+10000,可以分割成10个子任务,每个子任务分别对1000个数进行求和,最终汇总这10个子任务的结果。


ForkJoinTask有两个主要的方法:

  • fork () – 这个方法决定了ForkJoinTask的异步执行,凭借这个方法可以创建新的任务。
  • join () – 该方法负责在计算完成侯返回结果,因此允许一个任务等待另一任务执行完成。

Java并行计算框架Fork/Join

3.工作窃取(Work stealing)

Fork/Join框架在java.util.concurrent包中加入了两个主要的类:

  • ForkJoinPool
  • ForkJoinTask

ForkJoinPool类是ForkJoinTask实例的执行者,ForkJoinPool的主要任务就是”工作窃取”,其线程尝试发现和执行其他任务创建的子任务。

4.工作原理

  • 第一、分割任务,只要任务的粒度超过阀值,就不停地将任务分拆为小任务;
  • 第二、执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里。
  • 第三、再启动一个线程合并结果队列的值。

Fork-Join框架涉及的主要类如下:

Java并行计算框架Fork/Join

Fork/Join使用两个类来完成上面三步:

  • ForkJoinTask:我们要使用ForkJoin框架,必须首先创建一个ForkJoin任务。它提供在任务中执行fork()和join()操作的机制,通常情况下我们不需要直接继承ForkJoinTask类,而只需要继承它的子类,Fork/Join框架提供了以下两个子类:

    • RecursiveAction:用于没有返回结果的任务。
    • RecursiveTask :用于有返回结果的任务,通过泛型参数设置计算的返回值类型。
  • ForkJoinPool :ForkJoinTask需要通过ForkJoinPool来执行,任务分割出的子任务会添加到当前工作线程所维护的双端队列中,进入队列的头部。当一个工作线程的队列里暂时没有任务时,它会随机从其他工作线程的队列的尾部获取一个任务。(需要实现compute方法)。


5.使用Fork/Join

下面实现一个这样的任务:计算1+2+…+10000 累加任务。

Java并行计算框架Fork/Join

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.Future;
import java.util.concurrent.RecursiveTask;

public class Main extends RecursiveTask<Integer> {
    private static final int THRESHOLD = 100;
    private int start;
    private int end;

    public Main(int start, int end){
        this.start = start;
        this.end = end;
    }

    protected Integer compute(){
        int sum = 0;
        boolean canComputer = (end - start) <= THRESHOLD;
        //如果任务规模小于 阈值 THRESHOLD 就直接计算
        if(canComputer){
            for(int i = start; i <= end; i++){
                sum += i;
            }
        }else{
            //如果任务大于阈值就裂变为两个子任务
            int middle = (start + end) / 2;
            Main leftTask = new Main(start, middle);
            Main rightTask = new Main(middle + 1, end);
            //执行子任务
            leftTask.fork();
            rightTask.fork();
            int leftResult = leftTask.join();
            int rightResult = rightTask.join();
            //合并子任务
            sum = leftResult + rightResult;
        }
        return sum;
    }

    public static void main(String[] args){
        ForkJoinPool pool = new ForkJoinPool();
        Main task = new Main(1, 10000);
        Future<Integer> result = pool.submit(task);
        try{
            System.out.println("result is:" + result.get());
        }catch(InterruptedException e){
        }catch(ExecutionException e){
        }finally{
            pool.shutdown();
        }
    }
}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
Java并行计算框架Fork/Join
Java并行计算框架Fork/Join
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52

Java并行计算框架Fork/Join

Java并行计算框架Fork/Join