欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

就是这篇了!Fork/Join以及FutureTask的原理分析(含RecursiveAction、RecursiveTask )

程序员文章站 2024-03-04 12:34:29
...

FutureTask

首先讲一下FutureTask,它表示的是一种,异步操作的典范。我提交了任务,在未来我要拿到结果。

  考虑一种简单的场景,甲问乙一个问题,乙一时回答不了,乙要去考虑一段时间(查一下资料),等到有结果了,再告诉甲。

  这时,我们需要类甲,类乙。

它的使用原理可以博主之前的篇文章:https://blog.csdn.net/qq_41864967/article/details/100737036(需要实现Callable接口或者Runable)

Future类就是对于具体的Runnable或者Callable任务的执行结果进行取消、查询是否完成、获取结果。

必要时,通过get方法获取执行结果,该方法会阻塞直到任务返回结果。

就是这篇了!Fork/Join以及FutureTask的原理分析(含RecursiveAction、RecursiveTask )

FutureTask实现了RunnableFuture,所以它既可以作为Runnable被线程执行,又可以作为Future得到Callback的返回值。

 public interface RunnableFuture<V> extends Runnable, Future<V> {  
        void run();  
    } 

分析:FutureTask除了实现了Future接口外还实现了Runnable接口(即可以通过Runnable接口实现线程,也可以通过Future取得线程执行完后的结果),因此FutureTask也可以直接提交给Executor执行。

 

Fork/Join

Fork/Join 框架是Java7提供的一个用于并行执行任务的框架。

是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。

就是这篇了!Fork/Join以及FutureTask的原理分析(含RecursiveAction、RecursiveTask )

工作窃取算法
         工作窃取(work-stealing)算法是指某个线程从其他队列里窃取任务来执行。
        
        假如需要做一个比较大的任务,我们可以把这个任务分割为若干互不依赖的子任务,为了减少线程间的竞争,于是把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应,比如A线程负责处理A队列里的任务。但是有的线程会先把自己队列里的任务干完,而其他线程对应的队列里还有任务等待处理。干完活的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列,被窃取任务线程永远从双端队列的头部拿任务执行,而窃取任务的线程永远从双端队列的尾部拿任务执行。

fork/join框架的核心是ForkJoinPool类,该类继承了AbstractExecutorService类。ForkJoinPool实现了工作窃取算法并且能够执行 ForkJoinTask任务。

公共抽象类ForkJoinTask


这个类的就是fork/join的基石类了。实现了Future接口。可以看成是一个轻量级的Future。

就是这篇了!Fork/Join以及FutureTask的原理分析(含RecursiveAction、RecursiveTask )

两个重要方法:

//安排在当前任务运行的池中异步执行此任务,有自定义线程池的话,就在自定义线程池里面,没有,就在公共线程池里面。

public final ForkJoinTask<V> fork() {
        Thread t;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
            ((ForkJoinWorkerThread)t).workQueue.push(this);
        else
            ForkJoinPool.common.externalPush(this);
        return this;
    }

 

当isDone()返回true时候,返回结果。
/**
     * Returns the result of the computation when it {@link #isDone is
     * done}.  This method differs from {@link #get()} in that
     * abnormal completion results in {@code RuntimeException} or
     * {@code Error}, not {@code ExecutionException}, and that
     * interrupts of the calling thread do <em>not</em> cause the
     * method to abruptly return by throwing {@code
     * InterruptedException}.
     *
     * @return the computed result
     */
    public final V join() {
        int s;
        if ((s = doJoin() & DONE_MASK) != NORMAL)
            reportException(s);
        return getRawResult();
    }

  join()和future接口的get()方法作用很相似区别是 join()方法不会抛出异常,get()会抛出异常。这两个都会阻塞等待结果。

 

公共类ForkJoinPool

fork/join专用线程池,和其他线程池最大的区别是,这个线程池里面的线程,会尝试查找 任务执行。其他线程池则需要自己提交。实现了work-stealing(工作窃取)算法。

构造方法:

就是这篇了!Fork/Join以及FutureTask的原理分析(含RecursiveAction、RecursiveTask )

可以由 boolean asyncMode 决定是否异步。

一般的,我们调用ForkJoinPool .commonPool()返回一个实例。这是系统帮我们定义好的,可以使用公共的线程池。

提交任务的几种方法:

void execute(ForkJoinTask<?> task) 异步提交执行,调用其fork方法在多个线程之间拆分工作,然后通过task的get(),或者join()得到结果。
<T> ForkJoinTask<T> submit​(ForkJoinTask<T> task) 提交ForkJoinTask以供执行,完成时返回一个future对象用于检查状态以及运行结果。然后通过返回的ForkJoinTask的get(),或者join()得到结果。
<T> T invoke(ForkJoinTask<T> task) 执行给定任务,完成后返回结果,会等待获得结果。
List < Future > invokeAll(Collection <?extends Callable > tasks)执行给定的任务,返回完成所有状态和结果的Futures列表。

 fork/join 就是特殊的线程池(ForkJoinPool)和特殊的future(ForkJoinTask)的配合。

官方写了两个具体的ForkJoinTask子类,我们平时使用时候,只需要extend 子类就行了。

 

ForkJoinTask子

  1. RecursiveAction 用于大多数不返回结果的计算
  2. RecursiveTask 会返回最终结果

我们继承上面的类的时候,重写compute()方法就行。 

使用例子如下:

RecursiveTask 例子:

public class Fibonacci extends RecursiveTask<Integer> {

    final int[] n;
    final int start;
    final int end;

    /**
     * 任务分段 判断值
     */
    final int part = 1000_0;

    /**
     * @param n     数组
     * @param start 计算区间 开始
     * @param end   计算区间 结束
     */
    public Fibonacci(int[] n, int start, int end) {
        this.n = n;
        this.start = start;
        this.end = end;
        if (start > end || start < 0 || end > n.length) {
            throw new IllegalArgumentException();
        }
    }

    @Override
    protected Integer compute() {
        //小于计算空间 直接求和
        if ((end - start) <= part) {
            //可变参数本质就是先创建了一个数组,该数组的大小就是可变参数的个数
            return sum(start, end, n);
        }
        //进行任务切割 划分
        Fibonacci f1 = new Fibonacci(n, start, start + part);
        //安排在当前任务运行的池中异步执行此任务(如果适用)
        f1.fork();//异步执行 也就是用另一个线程进行运算
        Fibonacci f2 = new Fibonacci(n, start + part, end);
        //join 返回计算结果,这个类似于get, 这个不会抛出异常
        return f2.compute() + f1.join();
    }


    private int sum(int start, int end, int... ints) {
        int temp = 0;
        for (int i = start; i < end; i++) {
            temp += ints[i];
        }
        return temp;
    }

}

test类:

public class Test {
    public static void main(String[] args) throws ExecutionException, InterruptedException {

        int[] longs = new int[1000_000_0];
        for (int i = 0; i < 1000_000_0; i++) {
            longs[i] = i;
        }
        Fibonacci fibonacci = new Fibonacci(longs, 0, longs.length);
        long before = System.currentTimeMillis();

        TimerTask timerTask = new TimerTask() {
            @Override
            public void run() {
                System.out.println(">>>>>>>>>>>>>>>>>>>>>>>" + fibonacci.isDone());
            }
        };
        Timer timer = new Timer();
        timer.schedule(timerTask, 0, 1000);
        //可以自定义 forkjoin线程池
        ForkJoinPool forkJoinPool = new ForkJoinPool(100);
        //使用公共线程池提交任务
        int result = ForkJoinPool.commonPool().invoke(fibonacci);
        long time = System.currentTimeMillis() - before;
        long before1 = System.currentTimeMillis();
        int result1 = sum(longs);
        long time1 = System.currentTimeMillis() - before1;
        System.out.println(result + " " + time + "   " + result1 + "  " + time1);

    }

    public static int sum(int[] ints) {
        int temp = 0;
        for (int i = 0; i < ints.length; i++) {
            temp += ints[i];
        }
        return temp;
    }
}

就是这篇了!Fork/Join以及FutureTask的原理分析(含RecursiveAction、RecursiveTask )

RecursiveAction 例子:

public class PrintTask extends RecursiveAction {
    private final int Max = 50;
    private int start;
    private int end;
    public PrintTask(int start,int end){
        this.start = start;
        this.end = end;
    }
    @Override
    protected void compute() {
        if((end - start)<Max){
            for(int i=start;i<end;i++){
                System.out.println("当前线程:"+Thread.currentThread().getName()+" i :"+i);
            }
        }else{
            int middle = (start+end)/2;
            PrintTask left = new PrintTask(start, middle);
            PrintTask right = new PrintTask(middle, end);
            left.fork();
            right.fork();
        }
    }

}

test类:

public class Test {
    public static void main(String[] args) throws InterruptedException {
        ForkJoinPool forkJoin = new ForkJoinPool();
        forkJoin.submit(new PrintTask(0, 200));
        forkJoin.awaitTermination(2, TimeUnit.SECONDS);
        forkJoin.shutdown();
    }
}

结果:

就是这篇了!Fork/Join以及FutureTask的原理分析(含RecursiveAction、RecursiveTask )

相关标签: Java相关