就是这篇了!Fork/Join以及FutureTask的原理分析(含RecursiveAction、RecursiveTask )
FutureTask
首先讲一下FutureTask,它表示的是一种,异步操作的典范。我提交了任务,在未来我要拿到结果。
考虑一种简单的场景,甲问乙一个问题,乙一时回答不了,乙要去考虑一段时间(查一下资料),等到有结果了,再告诉甲。
这时,我们需要类甲,类乙。
它的使用原理可以博主之前的篇文章:https://blog.csdn.net/qq_41864967/article/details/100737036(需要实现Callable接口或者Runable)
Future类就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。
必要时,通过get方法获取执行结果,该方法会阻塞直到任务返回结果。
FutureTask实现了RunnableFuture,所以它既可以作为Runnable被线程执行,又可以作为Future得到Callback的返回值。
public interface RunnableFuture<V> extends Runnable, Future<V> {
void run();
}
分析:FutureTask除了实现了Future接口外还实现了Runnable接口(即可以通过Runnable接口实现线程,也可以通过Future取得线程执行完后的结果),因此FutureTask也可以直接提交给Executor执行。
Fork/Join
Fork/Join 框架是Java7提供的一个用于并行执行任务的框架。
是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
工作窃取算法
工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。
假如需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。
fork/join框架的核心是ForkJoinPool
类,该类继承了AbstractExecutorService类。ForkJoinPool
实现了工作窃取算法并且能够执行 ForkJoinTask
任务。
公共抽象类ForkJoinTask
这个类的就是fork/join的基石类了。实现了Future接口。可以看成是一个轻量级的Future。
两个重要方法:
//安排在当前任务运行的池中异步执行此任务,有自定义线程池的话,就在自定义线程池里面,没有,就在公共线程池里面。
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
当isDone()返回true时候,返回结果。
/**
* Returns the result of the computation when it {@link #isDone is
* done}. This method differs from {@link #get()} in that
* abnormal completion results in {@code RuntimeException} or
* {@code Error}, not {@code ExecutionException}, and that
* interrupts of the calling thread do <em>not</em> cause the
* method to abruptly return by throwing {@code
* InterruptedException}.
*
* @return the computed result
*/
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
join()和future接口的get()方法作用很相似。区别是 join()方法不会抛出异常,get()会抛出异常。这两个都会阻塞等待结果。
公共类ForkJoinPool
fork/join专用线程池,和其他线程池最大的区别是,这个线程池里面的线程,会尝试查找 任务执行。其他线程池则需要自己提交。实现了work-stealing(工作窃取)算法。
构造方法:
可以由 boolean asyncMode 决定是否异步。
一般的,我们调用ForkJoinPool .commonPool()返回一个实例。这是系统帮我们定义好的,可以使用公共的线程池。
提交任务的几种方法:
void execute(ForkJoinTask<?> task) 异步提交执行,调用其fork方法在多个线程之间拆分工作,然后通过task的get(),或者join()得到结果。
<T> ForkJoinTask<T> submit(ForkJoinTask<T> task) 提交ForkJoinTask以供执行,完成时返回一个future对象用于检查状态以及运行结果。然后通过返回的ForkJoinTask的get(),或者join()得到结果。
<T> T invoke(ForkJoinTask<T> task) 执行给定任务,完成后返回结果,会等待获得结果。
List < Future > invokeAll(Collection <?extends Callable > tasks)执行给定的任务,返回完成所有状态和结果的Futures列表。
fork/join 就是特殊的线程池(ForkJoinPool)和特殊的future(ForkJoinTask)的配合。
官方写了两个具体的ForkJoinTask子类,我们平时使用时候,只需要extend 子类就行了。
ForkJoinTask子类
- RecursiveAction 用于大多数不返回结果的计算
- RecursiveTask 会返回最终结果
我们继承上面的类的时候,重写compute()方法就行。
使用例子如下:
RecursiveTask 例子:
public class Fibonacci extends RecursiveTask<Integer> {
final int[] n;
final int start;
final int end;
/**
* 任务分段 判断值
*/
final int part = 1000_0;
/**
* @param n 数组
* @param start 计算区间 开始
* @param end 计算区间 结束
*/
public Fibonacci(int[] n, int start, int end) {
this.n = n;
this.start = start;
this.end = end;
if (start > end || start < 0 || end > n.length) {
throw new IllegalArgumentException();
}
}
@Override
protected Integer compute() {
//小于计算空间 直接求和
if ((end - start) <= part) {
//可变参数本质就是先创建了一个数组,该数组的大小就是可变参数的个数
return sum(start, end, n);
}
//进行任务切割 划分
Fibonacci f1 = new Fibonacci(n, start, start + part);
//安排在当前任务运行的池中异步执行此任务(如果适用)
f1.fork();//异步执行 也就是用另一个线程进行运算
Fibonacci f2 = new Fibonacci(n, start + part, end);
//join 返回计算结果,这个类似于get, 这个不会抛出异常
return f2.compute() + f1.join();
}
private int sum(int start, int end, int... ints) {
int temp = 0;
for (int i = start; i < end; i++) {
temp += ints[i];
}
return temp;
}
}
test类:
public class Test {
public static void main(String[] args) throws ExecutionException, InterruptedException {
int[] longs = new int[1000_000_0];
for (int i = 0; i < 1000_000_0; i++) {
longs[i] = i;
}
Fibonacci fibonacci = new Fibonacci(longs, 0, longs.length);
long before = System.currentTimeMillis();
TimerTask timerTask = new TimerTask() {
@Override
public void run() {
System.out.println(">>>>>>>>>>>>>>>>>>>>>>>" + fibonacci.isDone());
}
};
Timer timer = new Timer();
timer.schedule(timerTask, 0, 1000);
//可以自定义 forkjoin线程池
ForkJoinPool forkJoinPool = new ForkJoinPool(100);
//使用公共线程池提交任务
int result = ForkJoinPool.commonPool().invoke(fibonacci);
long time = System.currentTimeMillis() - before;
long before1 = System.currentTimeMillis();
int result1 = sum(longs);
long time1 = System.currentTimeMillis() - before1;
System.out.println(result + " " + time + " " + result1 + " " + time1);
}
public static int sum(int[] ints) {
int temp = 0;
for (int i = 0; i < ints.length; i++) {
temp += ints[i];
}
return temp;
}
}
RecursiveAction 例子:
public class PrintTask extends RecursiveAction {
private final int Max = 50;
private int start;
private int end;
public PrintTask(int start,int end){
this.start = start;
this.end = end;
}
@Override
protected void compute() {
if((end - start)<Max){
for(int i=start;i<end;i++){
System.out.println("当前线程:"+Thread.currentThread().getName()+" i :"+i);
}
}else{
int middle = (start+end)/2;
PrintTask left = new PrintTask(start, middle);
PrintTask right = new PrintTask(middle, end);
left.fork();
right.fork();
}
}
}
test类:
public class Test {
public static void main(String[] args) throws InterruptedException {
ForkJoinPool forkJoin = new ForkJoinPool();
forkJoin.submit(new PrintTask(0, 200));
forkJoin.awaitTermination(2, TimeUnit.SECONDS);
forkJoin.shutdown();
}
}
结果: