抽象了解下ForkJoinTask的拆分思想
demo代码如下:
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.atomic.AtomicInteger;
public class TestForkJoinPool {
static class ForkJoinSumCalculate extends RecursiveTask<Long> {
//计算开始值
private long start;
//计算结束值
private long end;
//拆分临界值
private static final long THURHOLD = 5L;
//拆分的次数
private static AtomicInteger number = new AtomicInteger(0);
//构造函数
public ForkJoinSumCalculate(long start, long end) {
this.start = start;
this.end = end;
}
@Override
protected Long compute() {
//获取菜单拆分的极限值
long length = end - start;
//判断拆分的极限值是否小于阈值,小于的话就可以合并了,不然就继续拆分
if (length <= THURHOLD) {
long sum = 0L;
for (long i = start; i <= end; i++) {
sum += i;
}
System.out.println("现在计算:【" + start + "到" + end + "】结果=" + sum + "-" + Thread.currentThread().getName());
return sum;
} else {
long middle = (start + end) / 2;
System.out.println(Thread.currentThread().getName() + ":get【" + start + "和" + end + "】的中位数=" + middle);
ForkJoinSumCalculate left = new ForkJoinSumCalculate(start, middle);
System.out.println(Thread.currentThread().getName() + ":push【" + start + "和" + middle + "】入队列");
//进行拆分,然后压入线程队列
left.fork();
ForkJoinSumCalculate right = new ForkJoinSumCalculate(middle + 1, end);
System.out.println(Thread.currentThread().getName() + ":push【" + (middle + 1) + "和" + end + "】入队列");
//进行拆分,然后压入线程队列
right.fork();
System.out.println(Thread.currentThread().getName() + "正在等待【" + start + "到" + end + "】的结果");
Long result1 = left.join();
Long result2 = right.join();
Long result = result1 + result2;
System.out.println(Thread.currentThread().getName() + "得到了【" + start + "到" + end + "】=" + result);
return result;
}
}
}
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask forkJoinTask = new ForkJoinSumCalculate(1, 16);
Object result = forkJoinPool.invoke(forkJoinTask);
System.out.println(result);
}
}
运行结果:
先从运行结果中抽出第一步get中位数,如下:
ForkJoinPool-1-worker-1:get【1和16】的中位数=8
ForkJoinPool-1-worker-3:get【9和16】的中位数=12
ForkJoinPool-1-worker-2:get【1和8】的中位数=4
发现执行了3次取中位数的操作,保证首尾的差值不会大于临界值5。
再看看第二步push操作,这里添加配合上get中位数的操作会更清晰,如下:
ForkJoinPool-1-worker-1:get【1和16】的中位数=8
ForkJoinPool-1-worker-1:push【1和8】入队列
ForkJoinPool-1-worker-2:get【1和8】的中位数=4
ForkJoinPool-1-worker-2:push【1和4】入队列
ForkJoinPool-1-worker-2:push【5和8】入队列
ForkJoinPool-1-worker-1:push【9和16】入队列
ForkJoinPool-1-worker-3:get【9和16】的中位数=12
ForkJoinPool-1-worker-3:push【9和12】入队列
ForkJoinPool-1-worker-3:push【13和16】入队列
经过排序和匹配,对应顺序应该如上显示的一样,线程worker-1通过中位数拆分, 线程worker-1 push【1和8】入队列,线程worker-2再对【1和8】做拆分,分为【1和4】以及【5和8】任务入队列,线程worker-1 push【9和16】入队列,线程worker-3再对【9和16】做拆分,分为【9和12】以及【13和16】任务入队列。
接着看看哪些线程正在等待:
ForkJoinPool-1-worker-1正在等待【1到16】的结果
ForkJoinPool-1-worker-2正在等待【1到8】的结果
ForkJoinPool-1-worker-3正在等待【9到16】的结果
会发现worker-1正在等待【1到16】的结果,再结合上述的push操作,可以知道【1到16】其实被拆分为了【1和8】以及【9和16】,然后worker-2被分配到了【1到8】这段,所以worker-2只会对【1到8】做再次拆分,以及等待【1到8】的结果,而worker-3被分配到了
【9到16】这段,所以worker-3只会对【9到16】做再次拆分,以及等待【9到16】的结果。
继续接着看,到底执行了几次计算:
现在计算:【1到4】结果=10
现在计算:【5到8】结果=26
现在计算:【9到12】结果=42
现在计算:【13到16】结果=58
可以发现,实际执行了4次计算,而每次计算的区间都小于5。
最后看看计算出的结果:
ForkJoinPool-1-worker-3得到了【9到16】=100
ForkJoinPool-1-worker-2得到了【1到8】=36
ForkJoinPool-1-worker-1得到了【1到16】=136
可以发现worker-3得到了【9到16】的结果为100,worker-2得到了【1到8】的结果为36,然后最后worker-1得到了【1到16】的最终结果为136。
总结
通过以上打印步骤和分析,可以简略抽象的理解下ForkJoinTask,Fork就是拆分,Join就是聚合,那么Fork体现在worker-1把【1和16】区间拆分为【1和8】以及【9和16】,worker-2再把【1到8】区间拆分为【1到4】以及【5到8】,worker-3把【9到16】区间拆分为【9到12】以及【13到16】,Join体现在worker-2等待着【1到4】以及【5到8】的结果,然后聚合二者的结果返回,所以结果为36,worker-2等待着【9到12】以及【13到16】的结果,然后聚合二者的结果返回,所以结果为100,最后worker-1等待着【1到8】以及【9到16】的结果,聚合二者的结果为136返回。整个拆分的思想可以简单的想像成,任务有一个最低界限,一个大任务进来,那么从大任务开始,只要任务没有达到最低界限,任务就会不断进行拆分,直到任务达到最低界限,才真正开始业务逻辑,然后从被拆分的最底层的小任务开始,往上层父任务汇报结果,上层父任务聚合结果再返回给自己的上层父任务,直到汇报到根任务,也就是一开始进来的大任务,大任务做最后一次结果聚合,然后返回。
本文地址:https://blog.csdn.net/weixin_38106322/article/details/107585501
上一篇: JAVASE基础模块十三(方法)
下一篇: Java中String字符串值的内存分配