导入大批量数据的案例
在导入数据量较大的excel文件到数据库中,常常遇上因为性能问题导致导入失败或响应失败的问题。这里涉及的知识点包含异步执行,线程导入分片和休眠的模块。笔者在最近和之前的工作中针对踩到的坑进行分析,对上面的知识点进行表述。
一般的网页请求是同步的,前端默认响应时间是30秒,最多为2分钟,这一点笔者在与前端联调时发现的。导入大量的数据到数据库中往往耗时超过2分钟,这一点超过了前端最长响应时间,前端一旦超时就容易报错(503错误)。故笔者采用异步的方式来进行导入,但是缺点是导入是否失败还不能直接响应给前端,此时笔者采用文件记录数据来记录文件导入是否失败。
异步导入之前,需要加入异步相关的配置,此配置必须在spring的扫描包下面,配置的文件如下:
@Configuration
@EnableAsync
public class ExecutorConfig {
@Value("${thread.maxPoolSize}")
private Integer maxPoolSize;
@Value("${thread.corePoolSize}")
private Integer corePoolSize;
@Value("${thread.keepAliveSeconds}")
private Integer keepAliveSeconds;
@Value("${thread.queueCapacity}")
private Integer queueCapacity;
@Bean
public ThreadPoolTaskExecutor asyncExecutor(){
ThreadPoolTaskExecutor taskExecutor=new ThreadPoolTaskExecutor();
taskExecutor.setCorePoolSize(corePoolSize);//核心数量
taskExecutor.setMaxPoolSize(maxPoolSize);//最大数量
taskExecutor.setQueueCapacity(queueCapacity);//队列
taskExecutor.setKeepAliveSeconds(keepAliveSeconds);//存活时间
taskExecutor.setWaitForTasksToCompleteOnShutdown(true);//设置等待任务完成后线程池再关闭
taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//设置拒绝策略
taskExecutor.initialize();//初始化
return taskExecutor;
}
}
图1 spring config线程池文件(不加这个@Async注解失效)
#线程池
thread:
corePoolSize: 5
maxPoolSize: 10
queueCapacity: 100
keepAliveSeconds: 3000
图2 yml文件配置
加入相关的配置文件后,对应要异步执行的方法需要加入@Async注解来实现异步的。图3中的方法在前两幅图的配置基础上异步才有效,其中dtos参数是要导入数据库的数据集合,fileImportRecord参数为记录文件上传的数据库表对应的实体类,方法insertDtos就是把dtos数据导入到数据库中。
@Async
public void saveOrUpdateDtos(List<SecSimExcelDTO> dtos, FileImportRecord fileImportRecord) {
int maxDealNum = 5000;
try {
insertDtos(dtos, fileImportRecord, maxDealNum);
fileImportRecord.setImportStatus("03");
fileImportRecord.setErrorInfo(" ");
log.info("import end!");
} catch (Exception e) {
log.error("here is an exception", e);
fileImportRecord.setImportStatus("05");
fileImportRecord.setErrorInfo(e.getClass().getName() + "-" + e.getMessage().replace(":", " ").replace(";", " "));
}
fileImportRecord.setImportEndTime(LocalDateTime.now());
fileImportRecord.setRecTime(LocalDateTime.now());
fileImportRecordMapper.updateByPrimaryKeySelective(fileImportRecord);
}
图3 异步方法详细
针对方法insertDtos,如果一下全部导入到数据库里面,就容易出现内存溢出的错误。这里笔者采用了将集合分片进行导入操作,同时加入了线程休眠部分,有效防止数据的缺失。具体方法如图4所示。
@Transactional
public void insertDtos(List<SecSimExcelDTO> dtos, FileImportRecord fileImportRecord, int maxDealNum) {
int maxThreadNums = 5;
/*这里我分成多块导入,通过队列的出队来进行实现,防止了导入时因数据量过大造成内存溢出*/
int size = dtos.size();
int pageSize = size / maxThreadNums;
BlockingQueue<List<SecSimExcelDTO>> results =
new LinkedBlockingQueue<>(ListUtils.partition(dtos, pageSize > 0 ? pageSize : 1)); /*对数据集合进行分片*/
while (!results.isEmpty()) {
List<SecSimExcelDTO> subList = results.poll();
insertExcelDtos(subList, fileImportRecord, maxDealNum);
/*下面加入了线程休眠,防止因内存溢出导致数据缺失的问题*/
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void insertExcelDtos(List<SecSimExcelDTO> dtos, FileImportRecord fileImportRecord, int maxDealNum) {
int size = dtos.size();
List<SecSim> secSims = convertFromExcelDto(dtos, fileImportRecord); /*这里转换实体类*/
if (size > maxDealNum) {
/*这里在数据过多时我分成多块导入,通过队列的出队来进行实现,防止了导入时因数据量过大造成内存溢出*/
BlockingQueue<List<SecSim>> secSimQueue = new LinkedBlockingQueue<>(ListUtils.partition(secSims, maxDealNum));
while (!secSimQueue.isEmpty()) {
List<SecSim> keys = secSimQueue.poll();
saveOrUpdateBatch(keys, maxDealNum);
}
} else {
saveOrUpdateBatch(secSims, maxDealNum);
}
}
List<SecSim> convertFromExcelDto(List<SecSimExcelDTO> dtos, FileImportRecord fileImportRecord) {
return dtos.stream().map(dto -> {
SecSim secSim = BeanUtil.copy(dto, SecSim.class);
secSim.setBatchNo(fileImportRecord.getFileId());
secSim.setDataSrc("02");
secSim.setImporter(String.valueOf(fileImportRecord.getUploadUserId()));
secSim.setImportDate(LocalDateTime.now());
return secSim;
}).collect(Collectors.toList());
}
图4 分片导入数据到数据库方法
对于导入数据库需求有则更新无则插入这一点,hibernate里面有对应的saveOrUpdate方法,但是mybatis要在原来的基础上service层加入根据主键查询数据来进行判断,总之原理是一样的。而merge into语句的性能上很差。对于批量插入这里,需要加入flushStatement方法。具体的代码实现如图5所示。
public boolean saveOrUpdateBatch(Collection<SecSim> secSims, int batchSize) {
SqlSession session = sqlSessionBatch();
SecSimMapper secSimMapper = session.getMapper(SecSimMapper.class);
int i = 0;
for (SecSim secSim : secSims) {
SecSimKey secSimKey = BeanUtil.copy(secSim, SecSimKey.class);
SecSim sim = secSimMapper.selectByPrimaryKey(secSimKey);
if (sim != null) {
secSimMapper.updateByPrimaryKey(secSim);
} else {
secSimMapper.insert(secSim);
}
if (i >= 1 && i % batchSize == 0) {
session.flushStatements();
}
i++;
}
session.flushStatements();
return true;
}
图5 批量插入代码
总之,导入或初始化大批量数据需要异步进行的。埋点统计或记录登陆日志之类在主业务之外的东西也需要异步执行。笔者工作因为没有加入异步相关的配置和注解,从而出现503错误以及内存溢出的功能,经过几年的摸索,终于摸索到文中的方法来解决对应的问题。
上一篇: JavaScript 继承
下一篇: mysql游标处理大批量数据问题
推荐阅读
-
精妙的SQL和SQL SERVER 与ACCESS、EXCEL的数据导入导出转换
-
精妙的SQL和SQL SERVER 与ACCESS、EXCEL的数据导入导出转换
-
Excel2010如何获取外部数据比如导入来自网站中的数据
-
PHP大批量数据操作时临时调整内存与执行时间的方法
-
对大数据的批量导入MySQL数据库
-
PHP+MySQL实现海量数据导入导出的总结:is_numbric函数的坑
-
sqlserver导入数据到mysql的详细图解
-
t-sql/mssql用命令行导入数据脚本的SQL语句示例
-
MsSQL数据导入到Mongo的默认编码问题(正确导入Mongo的方法)
-
把excel表格里的数据导入sql数据库的两种方法