欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

ForkJoinPool

程序员文章站 2022-05-05 22:45:46
...

ForkJoinPool 是Executor接口的实现,它专为可以递归分解成小块的工作而设计。
ForkJoinPool
意图梳理:
ForkJoinPool
ForkJoin 工作窃取的概念
ForkJoinPool
实现思路:
ForkJoinPool
并行的请求接口中数据:

public class HttpRequestBo extends RecursiveTask<JSONObject> {
    private ArrayList<String> urlList;
    private int start;
    private int end;
    private RestTemplate restTemplate;

    public HttpRequestBo(ArrayList<String> urlList, int start, int end, RestTemplate restTemplate) {
        this.urlList = urlList;
        this.start = start;
        this.end = end;
        this.restTemplate = restTemplate;
    }

    @Override
    protected JSONObject compute() {
        if (end - start == 0) {
            //直接执行
            System.out.println(Thread.currentThread() + "开始执行");
            String url = urlList.get(start);
            return restTemplate.getForObject(url, JSONObject.class);
        }
        JSONObject result = new JSONObject();
        int x = (start + end) / 2;
        HttpRequestBo httpRequestBo1 = new HttpRequestBo(urlList, start, x, restTemplate);
        ForkJoinTask<JSONObject> fork1 = httpRequestBo1.fork();

        HttpRequestBo httpRequestBo2 = new HttpRequestBo(urlList, x + 1, end, restTemplate);
        ForkJoinTask<JSONObject> fork2 = httpRequestBo2.fork();

            JSONObject jsonObject1 = fork1.join();
            JSONObject jsonObject2 = fork2.join();

            result.putAll(jsonObject1);
            result.putAll(jsonObject2);


        return result;

    }
}
/**
 * @author jingliyuan
 * @date 2020/8/29
 * 改成ForkJoinPool,并发的http请求
 */
@Service
public class ForkJoinService2 {
    @Autowired
    private RestTemplate  restTemplate;

    /**
     * 初始化“我的”页面数据
     */
    public JSONObject init() throws ExecutionException, InterruptedException {
        ArrayList<String> urlList = new ArrayList<>();
        urlList.add("http://localhost:8090/forkjoinpool/getName");
        urlList.add("http://localhost:8090/forkjoinpool/getBalance");

        ForkJoinPool forkJoinPool = new ForkJoinPool(5);
        ForkJoinTask<JSONObject> submit = forkJoinPool.submit(new HttpRequestBo(urlList, 0, urlList.size() - 1, restTemplate));
        JSONObject jsonObject = submit.get();
        System.out.println("输出的结果:" + jsonObject);
        return jsonObject;
    }
}

测试下结果:

  @Autowired
    private ForkJoinService2 forkJoinService2;
    @Test
    public void testForkJoinService2(){
        long startTime = System.currentTimeMillis();
        try {
            forkJoinService2.init();
        } catch (ExecutionException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("执行耗时:"+ (System.currentTimeMillis() - startTime)+"毫秒");
    }

ForkJoinPool

方式二:改成CountDownLatch,并发的http请求

@Service
public class CountDownLatchService3 {
    @Autowired
    private RestTemplate  restTemplate;

    private CountDownLatch countDownLatch = new CountDownLatch(2);
    //结果集
    private ArrayList<JSONObject> resultList = new ArrayList<>();

    /**
     * 初始化“我的”页面数据
     */
    public ArrayList<JSONObject> init() throws  InterruptedException {

        new Thread(() ->{
            JSONObject result = getResult("http://localhost:8090/forkjoinpool/getName");
            resultList.add(result);
            countDownLatch.countDown();
        }).start();

        new Thread(() ->{
            JSONObject result = getResult("http://localhost:8090/forkjoinpool/getBalance");
            resultList.add(result);
            countDownLatch.countDown();
        }).start();

        countDownLatch.await();
        resultList.forEach(System.out::println);
        return resultList;
    }

    public JSONObject getResult(String url){
        System.out.println(Thread.currentThread() + "开始执行");
        return restTemplate.getForObject(url, JSONObject.class);
    }
}

ForkJoinPool
方式三:改成FutureTask 异步执行的结果,并发的http请求

@Service
public class FutureTaskService4 {
    @Autowired
    private RestTemplate  restTemplate;

    //结果集
    private ArrayList<JSONObject> resultList = new ArrayList<>();

    /**
     * 初始化“我的”页面数据
     */
    public void init() throws InterruptedException, ExecutionException {

        FutureTask futureTask1 = new FutureTask<JSONObject>(() ->{
            JSONObject result = getResult("http://localhost:8090/forkjoinpool/getName");
            return result;
        });

        FutureTask futureTask2 = new FutureTask<JSONObject>(() ->{
            JSONObject result = getResult("http://localhost:8090/forkjoinpool/getBalance");
            return result;
        });

        new Thread(futureTask1).start();
        new Thread(futureTask2).start();

        JSONObject jsonObject1 = (JSONObject) futureTask1.get();
        JSONObject jsonObject2 = (JSONObject) futureTask2.get();
        System.out.println("futureTask1的结果是:"+jsonObject1);
        System.out.println("futureTask2的结果是:"+jsonObject2);
    }

    public JSONObject getResult(String url){
        System.out.println(Thread.currentThread() + "开始执行");
        return restTemplate.getForObject(url, JSONObject.class);
    }
}

ForkJoinPool

相关标签: java