ForkJoinPool
程序员文章站
2022-05-05 22:45:46
...
ForkJoinPool 是Executor接口的实现,它专为可以递归分解成小块的工作而设计。
意图梳理:
ForkJoin 工作窃取的概念
实现思路:
并行的请求接口中数据:
public class HttpRequestBo extends RecursiveTask<JSONObject> {
private ArrayList<String> urlList;
private int start;
private int end;
private RestTemplate restTemplate;
public HttpRequestBo(ArrayList<String> urlList, int start, int end, RestTemplate restTemplate) {
this.urlList = urlList;
this.start = start;
this.end = end;
this.restTemplate = restTemplate;
}
@Override
protected JSONObject compute() {
if (end - start == 0) {
//直接执行
System.out.println(Thread.currentThread() + "开始执行");
String url = urlList.get(start);
return restTemplate.getForObject(url, JSONObject.class);
}
JSONObject result = new JSONObject();
int x = (start + end) / 2;
HttpRequestBo httpRequestBo1 = new HttpRequestBo(urlList, start, x, restTemplate);
ForkJoinTask<JSONObject> fork1 = httpRequestBo1.fork();
HttpRequestBo httpRequestBo2 = new HttpRequestBo(urlList, x + 1, end, restTemplate);
ForkJoinTask<JSONObject> fork2 = httpRequestBo2.fork();
JSONObject jsonObject1 = fork1.join();
JSONObject jsonObject2 = fork2.join();
result.putAll(jsonObject1);
result.putAll(jsonObject2);
return result;
}
}
/**
* @author jingliyuan
* @date 2020/8/29
* 改成ForkJoinPool,并发的http请求
*/
@Service
public class ForkJoinService2 {
@Autowired
private RestTemplate restTemplate;
/**
* 初始化“我的”页面数据
*/
public JSONObject init() throws ExecutionException, InterruptedException {
ArrayList<String> urlList = new ArrayList<>();
urlList.add("http://localhost:8090/forkjoinpool/getName");
urlList.add("http://localhost:8090/forkjoinpool/getBalance");
ForkJoinPool forkJoinPool = new ForkJoinPool(5);
ForkJoinTask<JSONObject> submit = forkJoinPool.submit(new HttpRequestBo(urlList, 0, urlList.size() - 1, restTemplate));
JSONObject jsonObject = submit.get();
System.out.println("输出的结果:" + jsonObject);
return jsonObject;
}
}
测试下结果:
@Autowired
private ForkJoinService2 forkJoinService2;
@Test
public void testForkJoinService2(){
long startTime = System.currentTimeMillis();
try {
forkJoinService2.init();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("执行耗时:"+ (System.currentTimeMillis() - startTime)+"毫秒");
}
方式二:改成CountDownLatch,并发的http请求
@Service
public class CountDownLatchService3 {
@Autowired
private RestTemplate restTemplate;
private CountDownLatch countDownLatch = new CountDownLatch(2);
//结果集
private ArrayList<JSONObject> resultList = new ArrayList<>();
/**
* 初始化“我的”页面数据
*/
public ArrayList<JSONObject> init() throws InterruptedException {
new Thread(() ->{
JSONObject result = getResult("http://localhost:8090/forkjoinpool/getName");
resultList.add(result);
countDownLatch.countDown();
}).start();
new Thread(() ->{
JSONObject result = getResult("http://localhost:8090/forkjoinpool/getBalance");
resultList.add(result);
countDownLatch.countDown();
}).start();
countDownLatch.await();
resultList.forEach(System.out::println);
return resultList;
}
public JSONObject getResult(String url){
System.out.println(Thread.currentThread() + "开始执行");
return restTemplate.getForObject(url, JSONObject.class);
}
}
方式三:改成FutureTask 异步执行的结果,并发的http请求
@Service
public class FutureTaskService4 {
@Autowired
private RestTemplate restTemplate;
//结果集
private ArrayList<JSONObject> resultList = new ArrayList<>();
/**
* 初始化“我的”页面数据
*/
public void init() throws InterruptedException, ExecutionException {
FutureTask futureTask1 = new FutureTask<JSONObject>(() ->{
JSONObject result = getResult("http://localhost:8090/forkjoinpool/getName");
return result;
});
FutureTask futureTask2 = new FutureTask<JSONObject>(() ->{
JSONObject result = getResult("http://localhost:8090/forkjoinpool/getBalance");
return result;
});
new Thread(futureTask1).start();
new Thread(futureTask2).start();
JSONObject jsonObject1 = (JSONObject) futureTask1.get();
JSONObject jsonObject2 = (JSONObject) futureTask2.get();
System.out.println("futureTask1的结果是:"+jsonObject1);
System.out.println("futureTask2的结果是:"+jsonObject2);
}
public JSONObject getResult(String url){
System.out.println(Thread.currentThread() + "开始执行");
return restTemplate.getForObject(url, JSONObject.class);
}
}
推荐阅读