小白趟坑---慎用 parallelStream().forEach()
小白趟坑—慎用 parallelStream().forEach()
ps: 本系列第一贴,也可能是最后一贴。
循环遍历集合,是日常开发中常用的功能,自从java8引入lambda之后,除了常规的foreach之外,可以将遍历转为流操作,然后进行遍历,提升效率。
parallelStream()
和 stream()
,都可以讲集合转化为流,通过字面意思,可以看出 parallelStream()
是并行的操作,在大数据量下会优于 stream()
,于是就有了以下代码片段。
/**
* 保存操作记录详情
* @param changeFieldList 一次操作中,变化的字段及其对象的List,其中每个Map中均包含 operateFiled、operateAfter、operateBefore
* operateFiled中记录操作字段的名称、operateBefore记录操作前的值、operateAfter记录操作后的值
* @param sfModifyRecord 操作记录,记录了本次操作的时间、操作人
* @return 返回BasicResponse,记录保存结果
*/
public BasicResponse savaModifyDetail(List<Map<String ,Object>> changeFieldList, SFModifyRecord sfModifyRecord) {
List<SFModifyDetail> detailList = new ArrayList<>();
try {
if (!CollectionUtil.isNullOrEmpty(changeFieldList)) {
changeFieldList.parallelStream().forEach(changeField->{
SFModifyDetail sfModifyDetails = new SFModifyDetail();
sfModifyDetails.setOperateFiled(String.valueOf(changeField.get("operateFiled")));
sfModifyDetails.setOperateBefore(String.valueOf(changeField.get("operateBefore")));
sfModifyDetails.setOperateAfter(String.valueOf(changeField.get("operateAfter")));
sfModifyDetails.setModifyId(sfModifyRecord.getId());
detailList.add(sfModifyDetails);
});
}
modifyDetailRepository.saveAll(detailList);
return BasicResponse.ok();
} catch (Exception e) {
return BasicResponse.error("100", "保存历史记录详情数据出错:" + e);
}
}
下面是 SFModifyRecord
和 SFModifyDetail
的定义:
public class SFModifyRecord {
/* 操作ID */
private Long id;
/* 操作对象ID */
private Long operateObjectId;
private String createBy;
private Date createTime;
private Date lastmodifiedTime;
private String lastmodifiedBy;
/* 对象名称 */
private String objectName;
/* 流程实例id */
private String processId;
/* 操作类型(0:新增;1:编辑;2:删除) */
private String operateType;
/* 对象类型 */
private String objectType;
}
public class SFModifyDetail {
/* 操作记录ID */
private Long id;
/* 历史记录表ID */
private Long modifyId;
/* 操作字段 */
private String operateFiled;
/* 操作前的值 */
private String operateBefore;
/* 操作后的值 */
private String operateAfter;
}
savaModifyDetail
函数接受差异字段,转化为 SFModifyDetail
,并通过 modifyDetailRepository.saveAll
方法,存储到数据库中。
由于某些数据结构,变更的字段量会很大,考虑到有可能会有性能问题,本 小(cai)白(ji) 使用了看起来就很快的 parallelStream()
,将其转化为并行流后,加入到 ArrayList
中,批量进行数据库存储操作。
测试发现,在实际需求中,经常会出现操作记录少记录某个字段的问题,而且是随机出现,丢失的字段也不确定。有的记录,操作字段保存完整,有的明明修改了20个字段,只记录的19字段,而且数据库中的数据确实已经更新了,只是丢了一个字段的记录。还有一次,只有操作的记录,没有操作记录的详情。
经过各种猜想和假设(甚至一度怀疑JPA框架出问题了),最终发现了问题的所在,罪魁祸首就是 parallelStream()
。
parallelStream()
是并行操作的,在加入 ArrayList
的时候,会出问题。下面是三种方式的对比:
public class ParallelStreamTest {
public static void main(String[] args) {
/* 创建数组,用来构建不同大小的List */
Integer[] intArr = new Integer[200];
List<Integer> intList = Arrays.asList(intArr);
List<Integer> forList = new ArrayList<>();
List<Integer> streamForList = new ArrayList<>();
List<Integer> parallelStreamForList = new ArrayList<>();
intList.forEach(l -> forList.add(l));
intList.stream().forEach(l -> streamForList.add(l));
intList.parallelStream().forEach(l -> parallelStreamForList.add(l));
System.out.println(forList.size() + "---普通for循环数组");
System.out.println(streamForList.size() + "---stream流for循环数组");
System.out.println(parallelStreamForList.size() + "---parallelStream流for循环数组");
}
}
输出结果:
200—普通for循环数组
200—stream流for循环数组
197—parallelStream流for循环数组
从结果中可以看到,普通循环和 stream
循环均符合预期, parallelStream
的循环,数组中的数据量会少于预期。
操作记录中丢某一条字段的原因找到了,那丢失整组操作记录字段的原因,也是这个么?将 intArr
的大小改为 500,出现了如下报错:
Exception in thread “main” java.lang.ArrayIndexOutOfBoundsException
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.util.concurrent.ForkJoinTask.getThrowableException(ForkJoinTask.java:598)
at java.util.concurrent.ForkJoinTask.reportException(ForkJoinTask.java:677)
at java.util.concurrent.ForkJoinTask.invoke(ForkJoinTask.java:735)
at java.util.stream.ForEachOps$ForEachOp.evaluateParallel(ForEachOps.java:159)
at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateParallel(ForEachOps.java:173)
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:233)
at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:485)
at java.util.stream.ReferencePipeline$Head.forEach(ReferencePipeline.java:650)
at practice1.ParallelStreamTest.main(ParallelStreamTest.java:24)
Caused by: java.lang.ArrayIndexOutOfBoundsException: 1234
at java.util.ArrayList.add(ArrayList.java:465)
at practice1.ParallelStreamTest.lambda$main$2(ParallelStreamTest.java:24)
at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:183)
at java.util.Spliterators$ArraySpliterator.forEachRemaining(Spliterators.java:948)
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482)
at java.util.stream.ForEachOps$ForEachTask.compute(ForEachOps.java:290)
at java.util.concurrent.CountedCompleter.exec(CountedCompleter.java:731)
at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
at java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
at java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
at java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:175)
然后就从网上搜索了 parallelStream
的详解,觉得以下这篇写的还不错:
https://www.cnblogs.com/pengzhizhong/p/10191842.html
总结趟坑经验:
1、一定要了解后在使用,不要被函数名所迷惑。
2、不要轻易质疑成熟框架。
本文地址:https://blog.csdn.net/u011119942/article/details/109649066
上一篇: ssm框架整合演示
下一篇: Java中与日期时间相关的类和方法