分而治之ForkJoinPool的使用
ForkJoinPool概述
ForkJoinPool类似于Executor,但是有一个区别,ForkJoinPool以递归方式执行,也就是把一个较大的任务拆分成多个小任务去执行,这些小任务也可以拆分成更多小小任务,这个过程叫fork,最后把这些小任务一层层合并,这个过程叫join。
并且ForkJoinPool引入了一个算法叫工作窃取,ForkJoinPool中执行任务的线程是ForkJoinWorkerThread,其中维护了一个双端任务列队WorkQueue,里面存放的是ForkJoinTask。当线程把自己的任务干玩时,他会从其他没干玩任务的线程中窃取任务,并且,要窃取任务的线程会从这个列队中的尾部拿,被窃取任务的线程会从列队头部拿。
这个思想我们应该在小学时经常用,在做假期作业时,联系几个朋友,朋友A做1-10页,朋友B做11到20页,以此类推,最后把朋友的拿来一抄,很快一本册子就写玩了。
fork
把自身分成多个子任务,每个子任务可以由不同的CPU或同一CPU上的不同线程执行,子任务如果嫌多,可以在接着分,比如朋友A嫌10页多,可以让自己的朋友在分担5页,最后拿过来抄。
join
也就是子任务向上合并。但是有些时候任务不需要返回值,因此不需要合并。
举个例子
程序需要读取10个txt,每个txt大小为9.5M,如图,最后把内容都存放到StringBuffer中。
在单线程下,可能要花费695毫秒。
private static void testSingleThread() throws IOException {
long l = System.currentTimeMillis();
StringBuffer stringBuffer = new StringBuffer();
for (int i = 0; i < 10; i++) {
File file = new File("/home/cookie", i + ".txt");
FileReader fileReader = new FileReader(file);
BufferedReader bufferedReader = new BufferedReader(fileReader);
while (true) {
String s = bufferedReader.readLine();
if (s == null) {
break;
}
stringBuffer.append(s);
}
}
System.out.println(System.currentTimeMillis() - l + "毫秒 " + stringBuffer.length());
}
这个时候就可以使用多线程来为此工作,每个线程处理N个文本,最后完成合并。
首先通过submit提交一个任务,使用RecursiveTask代表有返回值,也可以使用无返回值任务RecursiveAction ,当要读取的文件数小于2时直接读取,否则,通过把List一分为二,创建两个子任务,如果两个子任务分配的数量大于2,那就接着在一分为二,调用fork开启任务(向工作列队中添加任务),但是任务开启后,要等待他的返回值,也就是通过join()方法。
private static void testForkJoin() throws ExecutionException, InterruptedException {
long l = System.currentTimeMillis();
ForkJoinPool forkJoinPool = new ForkJoinPool();
List<File> files = new ArrayList<>();
for (int i = 0; i < 10; i++) {
files.add(new File("/home/cookie", i + ".txt"));
}
ForkJoinTask<StringBuffer> submit = forkJoinPool.submit(new MyTask(files));
System.out.println(System.currentTimeMillis() - l + "毫秒 " + submit.get().length(
}
static class MyTask extends RecursiveTask<StringBuffer> {
List<File> files;
public MyTask(List<File> files) {
this.files = files;
}
@Override
protected StringBuffer compute() {
if (files.size() <= 2) {
StringBuffer stringBuffer = new StringBuffer();
for (int i = 0; i < files.size(); i++) {
File file = files.get(i);
try {
FileReader fileReader = new FileReader(file);
BufferedReader bufferedReader = new BufferedReader(fileReader);
while (true) {
String s = bufferedReader.readLine();
if (s == null) {
break;
}
stringBuffer.append(s);
}
} catch (IOException e) {
e.printStackTrace();
}
}
return stringBuffer;
}
int pos1 = files.size() / 2;
int pos2 = pos1;
MyTask myTask1 = new MyTask(files.subList(0, pos1));
MyTask myTask2 = new MyTask(files.subList(pos2, files.size()));
myTask1.fork();
myTask2.fork();
return myTask1.join().append(myTask2.join());
}
}
运行上面程序,仅仅花费3毫秒。
上一篇: Java多线程中的死锁现象
下一篇: 多线程死锁现象