欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

导入大批量数据的案例

程序员文章站 2022-03-04 14:07:51
...

在导入数据量较大的excel文件到数据库中,常常遇上因为性能问题导致导入失败或响应失败的问题。这里涉及的知识点包含异步执行,线程导入分片和休眠的模块。笔者在最近和之前的工作中针对踩到的坑进行分析,对上面的知识点进行表述。
一般的网页请求是同步的,前端默认响应时间是30秒,最多为2分钟,这一点笔者在与前端联调时发现的。导入大量的数据到数据库中往往耗时超过2分钟,这一点超过了前端最长响应时间,前端一旦超时就容易报错(503错误)。故笔者采用异步的方式来进行导入,但是缺点是导入是否失败还不能直接响应给前端,此时笔者采用文件记录数据来记录文件导入是否失败。
异步导入之前,需要加入异步相关的配置,此配置必须在spring的扫描包下面,配置的文件如下:

@Configuration
@EnableAsync
public class ExecutorConfig {

   @Value("${thread.maxPoolSize}")
   private Integer maxPoolSize;
   @Value("${thread.corePoolSize}")
   private Integer corePoolSize;
   @Value("${thread.keepAliveSeconds}")
   private Integer keepAliveSeconds;
   @Value("${thread.queueCapacity}")
   private Integer queueCapacity;
   @Bean
   public ThreadPoolTaskExecutor asyncExecutor(){
      ThreadPoolTaskExecutor taskExecutor=new ThreadPoolTaskExecutor();
      taskExecutor.setCorePoolSize(corePoolSize);//核心数量
      taskExecutor.setMaxPoolSize(maxPoolSize);//最大数量
      taskExecutor.setQueueCapacity(queueCapacity);//队列
      taskExecutor.setKeepAliveSeconds(keepAliveSeconds);//存活时间
      taskExecutor.setWaitForTasksToCompleteOnShutdown(true);//设置等待任务完成后线程池再关闭
      taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());//设置拒绝策略
      taskExecutor.initialize();//初始化
      return taskExecutor;
   }
}

图1 spring config线程池文件(不加这个@Async注解失效)

#线程池
thread:
  corePoolSize: 5
  maxPoolSize: 10
  queueCapacity: 100
  keepAliveSeconds: 3000

图2 yml文件配置
加入相关的配置文件后,对应要异步执行的方法需要加入@Async注解来实现异步的。图3中的方法在前两幅图的配置基础上异步才有效,其中dtos参数是要导入数据库的数据集合,fileImportRecord参数为记录文件上传的数据库表对应的实体类,方法insertDtos就是把dtos数据导入到数据库中。

@Async
public void saveOrUpdateDtos(List<SecSimExcelDTO> dtos, FileImportRecord fileImportRecord) {

   int maxDealNum = 5000;
   try {

      insertDtos(dtos, fileImportRecord, maxDealNum);
      fileImportRecord.setImportStatus("03");
      fileImportRecord.setErrorInfo(" ");
      log.info("import end!");
   } catch (Exception e) {

      log.error("here is an exception", e);
      fileImportRecord.setImportStatus("05");
      fileImportRecord.setErrorInfo(e.getClass().getName() + "-" + e.getMessage().replace(":", " ").replace(";", " "));
   }
   fileImportRecord.setImportEndTime(LocalDateTime.now());
   fileImportRecord.setRecTime(LocalDateTime.now());
   fileImportRecordMapper.updateByPrimaryKeySelective(fileImportRecord);
}

图3 异步方法详细
针对方法insertDtos,如果一下全部导入到数据库里面,就容易出现内存溢出的错误。这里笔者采用了将集合分片进行导入操作,同时加入了线程休眠部分,有效防止数据的缺失。具体方法如图4所示。

@Transactional
public void insertDtos(List<SecSimExcelDTO> dtos, FileImportRecord fileImportRecord, int maxDealNum) {

   int maxThreadNums = 5;
   /*这里我分成多块导入,通过队列的出队来进行实现,防止了导入时因数据量过大造成内存溢出*/
   int size = dtos.size();
   int pageSize = size / maxThreadNums;
   BlockingQueue<List<SecSimExcelDTO>> results =
      new LinkedBlockingQueue<>(ListUtils.partition(dtos, pageSize > 0 ? pageSize : 1)); /*对数据集合进行分片*/

   while (!results.isEmpty()) {

      List<SecSimExcelDTO> subList = results.poll();
      insertExcelDtos(subList, fileImportRecord, maxDealNum);
      /*下面加入了线程休眠,防止因内存溢出导致数据缺失的问题*/
      try {
         Thread.sleep(1000);
      } catch (InterruptedException e) {
         e.printStackTrace();
      }
   }
}
private void insertExcelDtos(List<SecSimExcelDTO> dtos, FileImportRecord fileImportRecord, int maxDealNum) {
   int size = dtos.size();
   List<SecSim> secSims = convertFromExcelDto(dtos, fileImportRecord); /*这里转换实体类*/
   if (size > maxDealNum) {
      /*这里在数据过多时我分成多块导入,通过队列的出队来进行实现,防止了导入时因数据量过大造成内存溢出*/
      BlockingQueue<List<SecSim>> secSimQueue = new LinkedBlockingQueue<>(ListUtils.partition(secSims, maxDealNum));
      while (!secSimQueue.isEmpty()) {
         List<SecSim> keys = secSimQueue.poll();
         saveOrUpdateBatch(keys, maxDealNum);
      }
   } else {
      saveOrUpdateBatch(secSims, maxDealNum);
   }
}
List<SecSim> convertFromExcelDto(List<SecSimExcelDTO> dtos, FileImportRecord fileImportRecord) {

   return dtos.stream().map(dto -> {
      SecSim secSim = BeanUtil.copy(dto, SecSim.class);
      secSim.setBatchNo(fileImportRecord.getFileId());
      secSim.setDataSrc("02");
      secSim.setImporter(String.valueOf(fileImportRecord.getUploadUserId()));
      secSim.setImportDate(LocalDateTime.now());
      return secSim;
   }).collect(Collectors.toList());
}

图4 分片导入数据到数据库方法
对于导入数据库需求有则更新无则插入这一点,hibernate里面有对应的saveOrUpdate方法,但是mybatis要在原来的基础上service层加入根据主键查询数据来进行判断,总之原理是一样的。而merge into语句的性能上很差。对于批量插入这里,需要加入flushStatement方法。具体的代码实现如图5所示。

public boolean saveOrUpdateBatch(Collection<SecSim> secSims, int batchSize) {
   SqlSession session = sqlSessionBatch();
   SecSimMapper secSimMapper = session.getMapper(SecSimMapper.class);
   int i = 0;
   for (SecSim secSim : secSims) {
      SecSimKey secSimKey = BeanUtil.copy(secSim, SecSimKey.class);
      SecSim sim = secSimMapper.selectByPrimaryKey(secSimKey);
      if (sim != null) {
         secSimMapper.updateByPrimaryKey(secSim);
      } else {
         secSimMapper.insert(secSim);
      }
      if (i >= 1 && i % batchSize == 0) {
         session.flushStatements();
      }
      i++;
   }
   session.flushStatements();
   return true;
}

图5 批量插入代码
总之,导入或初始化大批量数据需要异步进行的。埋点统计或记录登陆日志之类在主业务之外的东西也需要异步执行。笔者工作因为没有加入异步相关的配置和注解,从而出现503错误以及内存溢出的功能,经过几年的摸索,终于摸索到文中的方法来解决对应的问题。