欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

抽象了解下ForkJoinTask的拆分思想

程序员文章站 2022-04-15 19:05:59
demo代码如下:import java.util.concurrent.ForkJoinPool;import java.util.concurrent.ForkJoinTask;import java.util.concurrent.RecursiveTask;import java.util.concurrent.atomic.AtomicInteger;public class TestForkJoinPool { static class ForkJoinSumCalculat...

demo代码如下:

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.atomic.AtomicInteger;

public class TestForkJoinPool {
    static class ForkJoinSumCalculate extends RecursiveTask<Long> {
        //计算开始值
        private long start;
        //计算结束值
        private long end;
        //拆分临界值
        private static final long THURHOLD = 5L;
        //拆分的次数
        private static AtomicInteger number = new AtomicInteger(0);

        //构造函数
        public ForkJoinSumCalculate(long start, long end) {
            this.start = start;
            this.end = end;
        }

        @Override
        protected Long compute() {
            //获取菜单拆分的极限值
            long length = end - start;
            //判断拆分的极限值是否小于阈值,小于的话就可以合并了,不然就继续拆分
            if (length <= THURHOLD) {
                long sum = 0L;
                for (long i = start; i <= end; i++) {
                    sum += i;
                }
                System.out.println("现在计算:【" + start + "到" + end + "】结果=" + sum + "-" + Thread.currentThread().getName());
                return sum;
            } else {
                long middle = (start + end) / 2;
                System.out.println(Thread.currentThread().getName() + ":get【" + start + "和" + end + "】的中位数=" + middle);
                ForkJoinSumCalculate left = new ForkJoinSumCalculate(start, middle);
                System.out.println(Thread.currentThread().getName() + ":push【" + start + "和" + middle + "】入队列");
                //进行拆分,然后压入线程队列
                left.fork();
                ForkJoinSumCalculate right = new ForkJoinSumCalculate(middle + 1, end);
                System.out.println(Thread.currentThread().getName() + ":push【" + (middle + 1) + "和" + end + "】入队列");
                //进行拆分,然后压入线程队列
                right.fork();
                System.out.println(Thread.currentThread().getName() + "正在等待【" + start + "到" + end + "】的结果");
                Long result1 = left.join();
                Long result2 = right.join();
                Long result = result1 + result2;
                System.out.println(Thread.currentThread().getName() + "得到了【" + start + "到" + end + "】=" + result);
                return result;
            }
        }
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask forkJoinTask = new ForkJoinSumCalculate(1, 16);
        Object result = forkJoinPool.invoke(forkJoinTask);
        System.out.println(result);
    }
}

运行结果:
抽象了解下ForkJoinTask的拆分思想

先从运行结果中抽出第一步get中位数,如下:

ForkJoinPool-1-worker-1:get【116】的中位数=8
ForkJoinPool-1-worker-3:get【916】的中位数=12
ForkJoinPool-1-worker-2:get【18】的中位数=4

发现执行了3次取中位数的操作,保证首尾的差值不会大于临界值5。

再看看第二步push操作,这里添加配合上get中位数的操作会更清晰,如下:

ForkJoinPool-1-worker-1:get【116】的中位数=8

ForkJoinPool-1-worker-1:push【18】入队列
ForkJoinPool-1-worker-2:get【18】的中位数=4
ForkJoinPool-1-worker-2:push【14】入队列
ForkJoinPool-1-worker-2:push【58】入队列


ForkJoinPool-1-worker-1:push【916】入队列
ForkJoinPool-1-worker-3:get【916】的中位数=12
ForkJoinPool-1-worker-3:push【912】入队列
ForkJoinPool-1-worker-3:push【1316】入队列

经过排序和匹配,对应顺序应该如上显示的一样,线程worker-1通过中位数拆分, 线程worker-1 push【1和8】入队列,线程worker-2再对【1和8】做拆分,分为【1和4】以及【5和8】任务入队列,线程worker-1 push【9和16】入队列,线程worker-3再对【9和16】做拆分,分为【9和12】以及【13和16】任务入队列。

接着看看哪些线程正在等待:

ForkJoinPool-1-worker-1正在等待【116】的结果
ForkJoinPool-1-worker-2正在等待【18】的结果
ForkJoinPool-1-worker-3正在等待【916】的结果

会发现worker-1正在等待【1到16】的结果,再结合上述的push操作,可以知道【1到16】其实被拆分为了【1和8】以及【9和16】,然后worker-2被分配到了【1到8】这段,所以worker-2只会对【1到8】做再次拆分,以及等待【1到8】的结果,而worker-3被分配到了
【9到16】这段,所以worker-3只会对【9到16】做再次拆分,以及等待【9到16】的结果。

继续接着看,到底执行了几次计算:

现在计算:【14】结果=10
现在计算:【58】结果=26
现在计算:【912】结果=42
现在计算:【1316】结果=58

可以发现,实际执行了4次计算,而每次计算的区间都小于5。

最后看看计算出的结果:

ForkJoinPool-1-worker-3得到了【916=100
ForkJoinPool-1-worker-2得到了【18=36
ForkJoinPool-1-worker-1得到了【116=136

可以发现worker-3得到了【9到16】的结果为100,worker-2得到了【1到8】的结果为36,然后最后worker-1得到了【1到16】的最终结果为136。

总结

通过以上打印步骤和分析,可以简略抽象的理解下ForkJoinTask,Fork就是拆分,Join就是聚合,那么Fork体现在worker-1把【1和16】区间拆分为【1和8】以及【9和16】,worker-2再把【1到8】区间拆分为【1到4】以及【5到8】,worker-3把【9到16】区间拆分为【9到12】以及【13到16】,Join体现在worker-2等待着【1到4】以及【5到8】的结果,然后聚合二者的结果返回,所以结果为36,worker-2等待着【9到12】以及【13到16】的结果,然后聚合二者的结果返回,所以结果为100,最后worker-1等待着【1到8】以及【9到16】的结果,聚合二者的结果为136返回。整个拆分的思想可以简单的想像成,任务有一个最低界限,一个大任务进来,那么从大任务开始,只要任务没有达到最低界限,任务就会不断进行拆分,直到任务达到最低界限,才真正开始业务逻辑,然后从被拆分的最底层的小任务开始,往上层父任务汇报结果,上层父任务聚合结果再返回给自己的上层父任务,直到汇报到根任务,也就是一开始进来的大任务,大任务做最后一次结果聚合,然后返回。

本文地址:https://blog.csdn.net/weixin_38106322/article/details/107585501

相关标签: Java并发编程