并发编程:fork/join框架:创建一个fork/join框架(RecursiveAction)
程序员文章站
2022-06-28 17:14:27
fork/join框架简介,并创建了一个简单的RecursiveAction(fork/join)应用。...
目录
fork/join框架
该框架的设计目的是以分治思想来解决那些可以分解成较小任务的问题。(给定一个规模,若大于该规模则将其分割成较小的任务并使用框架来执行;若小于该规模则直接解决。
- fork操作:将任务分割为更小的任务并使用该框架执行他们。
- join操作:当一个任务等待它创建的所有子任务结束时,该操作用来组合这些任务的执行结果。
与Executor框架的区别
fork/join使用工作量测算法。当一个任务使用join操作等待它创建的子任务执行结束时,执行任务的线程(即工作线程)会寻找其他未执行的任务并执行它们。按照这种方式,线程可以最大程度地利用它们的执行时间,从而提升应用性能。
fork/join框架的限制
- 任务只能使用fork()和join()方法作为同步机制。如果在任务中使用了其他同步机制,工作线程在同步操作中就不能再执行其他未执行的任务。
- 任务不应该执行任何I/O操作,如读/写文件中的数据。
- 任务不能抛出受检异常。任务必须包含必要的处理受检异常的代码。
fork/join框架的两个核心类
- ForkJoinPool:该类实现ExecutorService接口和工作量测算法,它管理工作线程并提供任务本身和任务执行过程信息。
- ForkJoinTask:该类是在ForkJoinPool内所执行任务的基类。它提供了在任务内执行fork()和join()操作的机制,以及获取任务状态的方法。
ForkJoinTask的3个子类
- RecursiveAction,任务不返回执行结果。
- RecursiveTask,任务返回一个执行结果
- CountedCompleter类,所有子任务执行完毕后启动一个完成方法。
案例说明
实现一个任务来更新一组产品的价格,任务使用分治法,如果产品数量超过10个,则对其进行拆分。如果小于10个,则进行价格更新。
一、主程序
package xyz.jangle.thread.test.n5_2.forkjoin;
import java.util.List;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
/**
* 5.2、创建一个fork/join池
* 实现一个任务来更新一组产品的价格,任务使用分治法,如果产品数量超过10个,则对其进行拆分。如果小于10个,则进行价格更新。
* @author jangle
* @email jangle@jangle.xyz
* @time 2020年8月26日 下午5:02:26
*
*/
public class M {
public static void main(String[] args) {
ProductListGenerator generator = new ProductListGenerator();
List<Product> products = generator.generate(10000);
Task task = new Task(products, 0, products.size(), 0.2);
ForkJoinPool pool = new ForkJoinPool();
// 提交任务
pool.execute(task);
// 每5秒显示池中信息
do {
System.out.println("Main:当前线程数量(高估)Thread Count:" + pool.getActiveThreadCount());
System.out.println("Main:线程窃取数量(低估)Thread Steal:" + pool.getStealCount());
System.out.println("Main:并行级别Parallelism:" + pool.getParallelism());
try {
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
e.printStackTrace();
}
} while (!task.isDone());
pool.shutdown();
if (task.isCompletedNormally()) {
System.out.println("Main:程序正常执行完毕");
}
// 检查是否所有的产品价格都进行了涨幅
for (int i = 0; i < products.size(); i++) {
Product product = products.get(i);
if (product.getPrice() != 12) {
System.out.println("Product " + product.getName() + ":" + product.getPrice());
}
}
/*
* // 检查是否所有的产品价格都进行了涨幅(使用stream框架) products.stream().filter(product -> {
* if(product.getPrice() != 12) {
* System.out.println("Product "+product.getName()+":"+product.getPrice()); }
* return false; }).count();
*/
System.out.println("Main: 程序执行结束");
}
}
二、产品类(model)
package xyz.jangle.thread.test.n5_2.forkjoin;
/**
* 产品类,存储产品名称和价格(model)
* @author jangle
* @email jangle@jangle.xyz
* @time 2020年8月26日 下午5:03:06
*
*/
public class Product {
private String name;
private double price;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public double getPrice() {
return price;
}
public void setPrice(double price) {
this.price = price;
}
}
三、产品工厂类
package xyz.jangle.thread.test.n5_2.forkjoin;
import java.util.ArrayList;
import java.util.List;
/**
* 产品工厂
* @author jangle
* @email jangle@jangle.xyz
* @time 2020年8月26日 下午5:04:28
*
*/
public class ProductListGenerator {
/**
* 创建一个产品列表
*
* @param size
* @return
*/
public List<Product> generate(int size) {
ArrayList<Product> list = new ArrayList<Product>();
for (int i = 0; i < size; i++) {
Product product = new Product();
product.setName("Product " + i);
product.setPrice(10);
list.add(product);
}
return list;
}
}
四、任务类
compute方法是核心执行逻辑(也是分治逻辑编写的地方)
package xyz.jangle.thread.test.n5_2.forkjoin;
import java.util.List;
import java.util.concurrent.RecursiveAction;
/**
* 核心任务类
* compute方法是核心执行逻辑(也是分治逻辑编写的地方)
* @author jangle
* @email jangle@jangle.xyz
* @time 2020年8月26日 下午5:08:15
*
*/
public class Task extends RecursiveAction {
private static final long serialVersionUID = 1L;
private List<Product> products;
private int first, last;
// 涨幅
private double increment;
public Task(List<Product> products, int first, int last, double increment) {
super();
this.products = products;
this.first = first;
this.last = last;
this.increment = increment;
}
@Override
protected void compute() {
if (last - first < 10) {
updatePrices();
} else {
int middle = (last + first) / 2;
System.out.println("Task:待执行的线程数量:" + getQueuedTaskCount());
Task task1 = new Task(products, first, middle + 1, increment);
Task task2 = new Task(products, middle + 1, last, increment);
// 该方法创建子任务,并进行同步等待(在等待期间,该工作线程会选取待执行的任务进行执行
// 该方法是ForkJoinTask的invokeAll,与ExecutorService的invokeAll稍有区别
invokeAll(task1, task2);
}
}
/**
* 更新产品价格(first - last之间的)
*
* @author jangle
* @time 2020年8月26日 下午5:10:14
*/
private void updatePrices() {
for (int i = first; i < last; i++) {
Product product = products.get(i);
product.setPrice(product.getPrice() * (1 + increment));
}
}
}
五、执行结果
Main:当前线程数量(高估)Thread Count:1
Main:线程窃取数量(低估)Thread Steal:0
Main:并行级别Parallelism:8
Task:待执行的线程数量:0
Task:待执行的线程数量:1
Task:待执行的线程数量:1
Task:待执行的线程数量:0
Task:待执行的线程数量:1
Task:待执行的线程数量:0
Task:待执行的线程数量:0
...
...
...
Task:待执行的线程数量:1
Task:待执行的线程数量:2
Task:待执行的线程数量:0
Task:待执行的线程数量:1
Main:程序正常执行完毕
Main: 程序执行结束
本文地址:https://blog.csdn.net/Bof_jangle/article/details/108245237