欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

分而治之ForkJoinPool的使用

程序员文章站 2022-05-05 22:46:22
...

ForkJoinPool概述

ForkJoinPool类似于Executor,但是有一个区别,ForkJoinPool以递归方式执行,也就是把一个较大的任务拆分成多个小任务去执行,这些小任务也可以拆分成更多小小任务,这个过程叫fork,最后把这些小任务一层层合并,这个过程叫join。

并且ForkJoinPool引入了一个算法叫工作窃取,ForkJoinPool中执行任务的线程是ForkJoinWorkerThread,其中维护了一个双端任务列队WorkQueue,里面存放的是ForkJoinTask。当线程把自己的任务干玩时,他会从其他没干玩任务的线程中窃取任务,并且,要窃取任务的线程会从这个列队中的尾部拿,被窃取任务的线程会从列队头部拿。

这个思想我们应该在小学时经常用,在做假期作业时,联系几个朋友,朋友A做1-10页,朋友B做11到20页,以此类推,最后把朋友的拿来一抄,很快一本册子就写玩了。

fork

把自身分成多个子任务,每个子任务可以由不同的CPU或同一CPU上的不同线程执行,子任务如果嫌多,可以在接着分,比如朋友A嫌10页多,可以让自己的朋友在分担5页,最后拿过来抄。
分而治之ForkJoinPool的使用

join

也就是子任务向上合并。但是有些时候任务不需要返回值,因此不需要合并。
分而治之ForkJoinPool的使用

举个例子

程序需要读取10个txt,每个txt大小为9.5M,如图,最后把内容都存放到StringBuffer中。
分而治之ForkJoinPool的使用
在单线程下,可能要花费695毫秒。

private static void testSingleThread() throws IOException {
    long l = System.currentTimeMillis();
    StringBuffer stringBuffer = new StringBuffer();
    for (int i = 0; i < 10; i++) {
        File file = new File("/home/cookie", i + ".txt");
        FileReader fileReader = new FileReader(file);
        BufferedReader bufferedReader = new BufferedReader(fileReader);
        while (true) {
            String s = bufferedReader.readLine();
            if (s == null) {
                break;
            }
            stringBuffer.append(s);
        }
    }
    System.out.println(System.currentTimeMillis() - l + "毫秒 " + stringBuffer.length());
}

分而治之ForkJoinPool的使用
这个时候就可以使用多线程来为此工作,每个线程处理N个文本,最后完成合并。

首先通过submit提交一个任务,使用RecursiveTask代表有返回值,也可以使用无返回值任务RecursiveAction ,当要读取的文件数小于2时直接读取,否则,通过把List一分为二,创建两个子任务,如果两个子任务分配的数量大于2,那就接着在一分为二,调用fork开启任务(向工作列队中添加任务),但是任务开启后,要等待他的返回值,也就是通过join()方法。

 private static void testForkJoin() throws ExecutionException, InterruptedException {
     long l = System.currentTimeMillis();
     ForkJoinPool forkJoinPool = new ForkJoinPool();
     List<File> files = new ArrayList<>();
     for (int i = 0; i < 10; i++) {
         files.add(new File("/home/cookie", i + ".txt"));
     }
     ForkJoinTask<StringBuffer> submit = forkJoinPool.submit(new MyTask(files));
     System.out.println(System.currentTimeMillis() - l + "毫秒 " + submit.get().length(
 }
 static class MyTask extends RecursiveTask<StringBuffer> {
     List<File> files;
     public MyTask(List<File> files) {
         this.files = files;
     }
     @Override
     protected StringBuffer compute() {
         if (files.size() <= 2) {
             StringBuffer stringBuffer = new StringBuffer();
             for (int i = 0; i < files.size(); i++) {
                 File file = files.get(i);
                 try {
                     FileReader fileReader = new FileReader(file);
                     BufferedReader bufferedReader = new BufferedReader(fileReader);
                     while (true) {
                         String s = bufferedReader.readLine();
                         if (s == null) {
                             break;
                         }
                         stringBuffer.append(s);
                     }
                 } catch (IOException e) {
                     e.printStackTrace();
                 }
             }
             return stringBuffer;
         }
         int pos1 = files.size() / 2;
         int pos2 = pos1;
         MyTask myTask1 = new MyTask(files.subList(0, pos1));
         MyTask myTask2 = new MyTask(files.subList(pos2, files.size()));
         myTask1.fork();
         myTask2.fork();
         return myTask1.join().append(myTask2.join());
     }
 }

运行上面程序,仅仅花费3毫秒。
分而治之ForkJoinPool的使用