欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Java高并发编程中ForkJoinPool的使用及详细介绍-刘宇

程序员文章站 2022-05-05 22:45:40
...

作者:刘宇
CSDN博客地址:https://blog.csdn.net/liuyu973971883
有部分资料参考,如有侵权,请联系删除。如有不正确的地方,烦请指正,谢谢。

一、什么是ForkJoinPool?

通常在计算机中,每个任务都是交由每个线程来处理的,当一个非常耗时的任务交由一个线程来完成,而其他线程处于空闲状态时就显得不太合理。ForkJoinPool又叫分而治之,通俗来讲就是帮我们把一个任务分成许多小任务给不同的线程执行,然后通过join将多个线程处理的结果进行汇总返回。

1、ForkJoinPool内部中将Task分为两种

  • SubmissionTask:本地线程调用submit方法提交了任务

  • WorkerTask:框架内部fork出来的子任务

这两种任务都是保存在WorkQueue数组中的,内部通过哈希算法将任务与线程关联起来。他们的存放与WorkQueue位置有些特点,SubmissionTask存放于数组中的偶数索引位置,WorkerTask存放于奇数索引位置。

2、提交任务的两种方式

我们在提交任务时,一般不会直接继承ForkJoinTask,只要继承它的子类即可:

  • RecursiveAction:用于没有返回结果的任务(类似Runnable)
  • RecursiveTask:用于有返回结果的任务(类似Callable)

二、ForkJoinPool的运行图

Java高并发编程中ForkJoinPool的使用及详细介绍-刘宇

三、案例

1、提交有返回值的任务

package com.brycen.concurrency03.lockutils;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
import java.util.stream.IntStream;

public class ForkJoinRecurisiveTask {
	//最大计算数
	private static final int MAX_THRESHOLD = 5;
	public static void main(String[] args) {
		//创建ForkJoinPool
		ForkJoinPool pool = new ForkJoinPool();
		//异步提交RecursiveTask任务
		ForkJoinTask<Integer> forkJoinTask = pool.submit(new CalculatedRecurisiveTask(0,10));
		try {
			//根据返回类型获取返回值
			Integer result = forkJoinTask.get();
			System.out.println("结果为:"+result);
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			e.printStackTrace();
		}
	}
	
	private static class CalculatedRecurisiveTask extends RecursiveTask<Integer>{
		private int start;
		private int end;
		public CalculatedRecurisiveTask(int start, int end) {
			this.start = start;
			this.end = end;
		}
		@Override
		protected Integer compute() {
			//判断计算范围,如果小于等于5,那么一个线程计算就够了,否则进行分割
			if ((end-start)<=5) {
				return IntStream.rangeClosed(start, end).sum();
			}else {
				//任务分割
				int middle = (end+start)/2;
				CalculatedRecurisiveTask task1 = new CalculatedRecurisiveTask(start,middle);
				CalculatedRecurisiveTask task2 = new CalculatedRecurisiveTask(middle+1,end);
				//执行
				task1.fork();
				task2.fork();
				//等待返回结果
				return task1.join()+task2.join();
			}
		}
	}
}

运行结果:

结果为:55

2、提交无返回值的任务

package com.brycen.concurrency03.lockutils;

import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.IntStream;

public class ForkJoinRecurisiveAction {
	//最大计算数
	private static final int MAX_THRESHOLD = 5;
	private static final AtomicInteger SUM = new AtomicInteger(0);
	public static void main(String[] args) throws InterruptedException {
		//创建ForkJoinPool
		ForkJoinPool pool = new ForkJoinPool();
		//异步提交RecursiveAction任务
		pool.submit(new CalculatedRecurisiveTask(0,10));
		//等待3秒后输出结果,因为计算需要时间
		pool.awaitTermination(3, TimeUnit.SECONDS);
		System.out.println("结果为:"+SUM);
	}
	
	private static class CalculatedRecurisiveTask extends RecursiveAction{
		private int start;
		private int end;
		public CalculatedRecurisiveTask(int start, int end) {
			this.start = start;
			this.end = end;
		}
		@Override
		protected void compute() {
			//判断计算范围,如果小于等于5,那么一个线程计算就够了,否则进行分割
			if ((end-start)<=5) {
				//因为没有返回值,所有这里如果我们要获取结果,需要存入公共的变量中
				SUM.addAndGet(IntStream.rangeClosed(start, end).sum());
			}else {
				//任务分割
				int middle = (end+start)/2;
				CalculatedRecurisiveTask task1 = new CalculatedRecurisiveTask(start,middle);
				CalculatedRecurisiveTask task2 = new CalculatedRecurisiveTask(middle+1,end);
				//执行
				task1.fork();
				task2.fork();
			}
		}
	}
}

运行结果:

结果为:55
相关标签: Java