Java高并发编程中ForkJoinPool的使用及详细介绍-刘宇
程序员文章站
2022-05-05 22:45:40
...
Java高并发编程中ForkJoinPool的使用及详细介绍-刘宇
作者:刘宇
CSDN博客地址:https://blog.csdn.net/liuyu973971883
有部分资料参考,如有侵权,请联系删除。如有不正确的地方,烦请指正,谢谢。
一、什么是ForkJoinPool?
通常在计算机中,每个任务都是交由每个线程来处理的,当一个非常耗时的任务交由一个线程来完成,而其他线程处于空闲状态时就显得不太合理。ForkJoinPool又叫分而治之,通俗来讲就是帮我们把一个任务分成许多小任务给不同的线程执行,然后通过join将多个线程处理的结果进行汇总返回。
1、ForkJoinPool内部中将Task分为两种
-
SubmissionTask:本地线程调用submit方法提交了任务
-
WorkerTask:框架内部fork出来的子任务
这两种任务都是保存在WorkQueue数组中的,内部通过哈希算法将任务与线程关联起来。他们的存放与WorkQueue位置有些特点,SubmissionTask存放于数组中的偶数索引位置,WorkerTask存放于奇数索引位置。
2、提交任务的两种方式
我们在提交任务时,一般不会直接继承ForkJoinTask,只要继承它的子类即可:
- RecursiveAction:用于没有返回结果的任务(类似Runnable)
- RecursiveTask:用于有返回结果的任务(类似Callable)
二、ForkJoinPool的运行图
三、案例
1、提交有返回值的任务
package com.brycen.concurrency03.lockutils;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.IntStream;
public class ForkJoinRecurisiveTask {
//最大计算数
private static final int MAX_THRESHOLD = 5;
public static void main(String[] args) {
//创建ForkJoinPool
ForkJoinPool pool = new ForkJoinPool();
//异步提交RecursiveTask任务
ForkJoinTask<Integer> forkJoinTask = pool.submit(new CalculatedRecurisiveTask(0,10));
try {
//根据返回类型获取返回值
Integer result = forkJoinTask.get();
System.out.println("结果为:"+result);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
private static class CalculatedRecurisiveTask extends RecursiveTask<Integer>{
private int start;
private int end;
public CalculatedRecurisiveTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
//判断计算范围,如果小于等于5,那么一个线程计算就够了,否则进行分割
if ((end-start)<=5) {
return IntStream.rangeClosed(start, end).sum();
}else {
//任务分割
int middle = (end+start)/2;
CalculatedRecurisiveTask task1 = new CalculatedRecurisiveTask(start,middle);
CalculatedRecurisiveTask task2 = new CalculatedRecurisiveTask(middle+1,end);
//执行
task1.fork();
task2.fork();
//等待返回结果
return task1.join()+task2.join();
}
}
}
}
运行结果:
结果为:55
2、提交无返回值的任务
package com.brycen.concurrency03.lockutils;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;
public class ForkJoinRecurisiveAction {
//最大计算数
private static final int MAX_THRESHOLD = 5;
private static final AtomicInteger SUM = new AtomicInteger(0);
public static void main(String[] args) throws InterruptedException {
//创建ForkJoinPool
ForkJoinPool pool = new ForkJoinPool();
//异步提交RecursiveAction任务
pool.submit(new CalculatedRecurisiveTask(0,10));
//等待3秒后输出结果,因为计算需要时间
pool.awaitTermination(3, TimeUnit.SECONDS);
System.out.println("结果为:"+SUM);
}
private static class CalculatedRecurisiveTask extends RecursiveAction{
private int start;
private int end;
public CalculatedRecurisiveTask(int start, int end) {
this.start = start;
this.end = end;
}
@Override
protected void compute() {
//判断计算范围,如果小于等于5,那么一个线程计算就够了,否则进行分割
if ((end-start)<=5) {
//因为没有返回值,所有这里如果我们要获取结果,需要存入公共的变量中
SUM.addAndGet(IntStream.rangeClosed(start, end).sum());
}else {
//任务分割
int middle = (end+start)/2;
CalculatedRecurisiveTask task1 = new CalculatedRecurisiveTask(start,middle);
CalculatedRecurisiveTask task2 = new CalculatedRecurisiveTask(middle+1,end);
//执行
task1.fork();
task2.fork();
}
}
}
}
运行结果:
结果为:55