欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

mysql大批量数据插入优化

程序员文章站 2022-06-15 13:39:43
...

目前在项目中发现一张700万的表,插入10万条数据通过Mybatis的批量插入大概需要3分钟左右,耗时太长。现在通过mysql的LOAD DATA LOCAL INFILE 命令进行优化插入。最后测试10万条数据批量插入大概在3秒左右的时间。

定义命令执行语句

public interface FastBatchInsertSqlConstants {

	
	String VERIFY_ORDER_POOL_HIS_SQL = "LOAD DATA LOCAL INFILE 'sql.csv' IGNORE into TABLE t_verify_order_pool_his (mch_order_no,store_no,total_money,refund_money,pay_time,trade_type, bill_date, create_date,md5String)";
	String ORDER_POOL_SQL = "LOAD DATA LOCAL INFILE 'sql.csv' IGNORE into TABLE t_order_pool (out_trade_no,classdate,oli_station_code,ttc,amount,total_fee, takedate, pay_time,pay_platform,create_date)";

	String MODIFY_AND_DELETE_SQL = "LOAD DATA LOCAL INFILE 'sql.csv' IGNORE into TABLE t_modify_and_delete (table_pk,table_name,data,type,create_date,update_date)";
}

把对象转换成对应格式的文件流,并通过com.mysql.jdbc.PreparedStatement去执行命令

public class FastBatchInsertUtils {
	public static int bulkLoadFromInputStream(String loadDataSql, InputStream dataStream, Connection conn)
			throws SQLException {
		if (dataStream == null) {
			return 0;
		}
		PreparedStatement statement = conn.prepareStatement(loadDataSql);

		int result = 0;

		if (statement.isWrapperFor(com.mysql.jdbc.Statement.class)) {

			com.mysql.jdbc.PreparedStatement mysqlStatement = statement.unwrap(com.mysql.jdbc.PreparedStatement.class);

			mysqlStatement.setLocalInfileInputStream(dataStream);
			result = mysqlStatement.executeUpdate();
		}
		return result;
	}

	public static InputStream getDataInputStreamForVerifyOrderPool(List<VerifyOrderPool> verifyOrderPoolList) {
		StringBuilder builder = new StringBuilder();
		for (VerifyOrderPool verifyOrderPool : verifyOrderPoolList) {
			StringBuilder md5sb = new StringBuilder();
			builder.append(verifyOrderPool.getMchOrderNo());
			md5sb.append(verifyOrderPool.getMchOrderNo());
			builder.append("\t");
			builder.append(verifyOrderPool.getStoreNo());
			md5sb.append(verifyOrderPool.getStoreNo());
			builder.append("\t");
			builder.append(verifyOrderPool.getTotalMoney());
			md5sb.append(verifyOrderPool.getTotalMoney() != null ? verifyOrderPool.getTotalMoney() : 0);
			builder.append("\t");
			builder.append(verifyOrderPool.getRefundMoney());
			md5sb.append(verifyOrderPool.getRefundMoney() != null ? verifyOrderPool.getRefundMoney() : 0);
			builder.append("\t");
			builder.append(DateUtils.formatDate(
					verifyOrderPool.getPayTime() != null ? new Date() : verifyOrderPool.getPayTime(),
					"yyyy-MM-dd HH:mm:ss"));
			builder.append("\t");
			builder.append(verifyOrderPool.getTradeType());
			md5sb.append(verifyOrderPool.getTradeType());
			builder.append("\t");
			builder.append(verifyOrderPool.getBillDate());
			md5sb.append(verifyOrderPool.getBillDate());
			builder.append("\t");
			builder.append(DateUtils.formatDate(
					verifyOrderPool.getCreateDate() == null ? new Date() : verifyOrderPool.getCreateDate(),
					"yyyy-MM-dd HH:mm:ss"));
			builder.append("\t");

			builder.append(MD5Util.md5s(md5sb.toString())).append("\n");

		}
		byte[] bytes = builder.toString().getBytes();
		InputStream is = new ByteArrayInputStream(bytes);
		return is;
	}

	public static InputStream getDataInputStreamForOrderPool(List<OrderPool> orderPoolList) {
		StringBuilder builder = new StringBuilder();
		for (OrderPool orderPool : orderPoolList) {
			builder.append(orderPool.getOutTradeNo());
			builder.append("\t");
			builder.append(orderPool.getClassdate());
			builder.append("\t");
			builder.append(orderPool.getOliStationCode());
			builder.append("\t");
			builder.append(orderPool.getTtc());
			builder.append("\t");
			builder.append(orderPool.getAmount());
			builder.append("\t");
			builder.append(orderPool.getTotalFee());
			builder.append("\t");
			builder.append(DateUtils.formatDate(orderPool.getTakedate() == null ? new Date() : orderPool.getTakedate(),
					"yyyy-MM-dd HH:mm:ss"));
			builder.append("\t");
			builder.append(DateUtils.formatDate(orderPool.getPayTime() == null ? new Date() : orderPool.getPayTime(),
					"yyyy-MM-dd HH:mm:ss"));
			builder.append("\t");
			builder.append(orderPool.getPayPlatform());
			builder.append("\t");
			builder.append(DateUtils.formatDate(
					orderPool.getCreateDate() == null ? new Date() : orderPool.getCreateDate(), "yyyy-MM-dd HH:mm:ss"));
			builder.append("\t");
			builder.append("\n");

		}
		byte[] bytes = builder.toString().getBytes();
		InputStream is = new ByteArrayInputStream(bytes);
		return is;
	}

	public static InputStream getDataInputStreamForModifyAndDelete(List<ModifyAndDelete> modifyAndDeleteList) {
		StringBuilder builder = new StringBuilder();
		for (ModifyAndDelete modifyAndDelete : modifyAndDeleteList) {
			builder.append(modifyAndDelete.getTablePk());
			builder.append("\t");
			builder.append(modifyAndDelete.getTableName());
			builder.append("\t");
			builder.append(modifyAndDelete.getData());
			builder.append("\t");
			builder.append(modifyAndDelete.getType());
			builder.append("\t");
			builder.append(DateUtils.formatDate(
					modifyAndDelete.getCreateDate() == null ? new Date() : modifyAndDelete.getCreateDate(), "yyyy-MM-dd HH:mm:ss"));
			builder.append("\t");
			builder.append(DateUtils.formatDate(
					modifyAndDelete.getUpdateDate() == null ? new Date() : modifyAndDelete.getUpdateDate(), "yyyy-MM-dd HH:mm:ss"));
			builder.append("\t");
			builder.append("\n");

		}
		byte[] bytes = builder.toString().getBytes();
		InputStream is = new ByteArrayInputStream(bytes);
		return is;
	}

在项目里进行调用,注意,如果要和其他更新数据库的操作在同一事务中,应该使用带事务的spring上下文中的数据库连接

@Transactional
	public void newInsertBillData(WftDownRespVo result) throws Exception{
		VerifyOrderTotal total = result.getTotal();
		List<VerifyOrderPool> verifyOrderPoolList = result.getVerifyOrderPoolList();
		
		Map<String,List<VerifyOrderPool>> map = verifyOrderPoolList.stream().collect(Collectors.groupingBy(d -> fetchCodeGroupKey(d)));
		Date now = new Date();
		List<VerifyOrderTotalCode> totalCodeList = new ArrayList<VerifyOrderTotalCode>();
		for(Map.Entry<String, List<VerifyOrderPool>> entry : map.entrySet()) {
			String key = entry.getKey();
			String[] props = key.split(":");
			VerifyOrderTotalCode totalCode = new VerifyOrderTotalCode();
			totalCode.setBillDate(props[0]);
			totalCode.setStoreNo(props[1]);
			totalCode.setTradeType(props[2]);
			totalCode.setTradeNum(entry.getValue().size());
			totalCode.setTurnover(sumTotalFee(entry.getValue()));
			totalCode.setCreateDate(now);
			totalCode.setUpdateDate(now);
			totalCodeList.add(totalCode);
		}
		
		DataSourceTransactionManager transactionManager = SpringContextHolder
				.getBeanByClass(DataSourceTransactionManager.class);
		ConnectionHolder connectionHolder = (ConnectionHolder) TransactionSynchronizationManager
				.getResource(transactionManager.getDataSource());// 返回事务对象
		// 从spring中获取事务连接,让verifyOrderTotalMapper.insertSelective(total)与下面的LOAD DATA LOCAL INFILE操作是在同一个事务中
		Connection connection = connectionHolder.getConnection();
		
		// 注意不能用下面的获取Connection的方法,因为此方法获取到的连接是新的连接,并不是线程上下文中的连接
		// 如果用此连接的话那和verifyOrderTotalMapper.insertSelective(total)的连接就不在同一个事务中,那事务管理就不是spring管理
		// 需要自己手动进行事务管理
		// Connection connection= transactionManager.getDataSource().getConnection();
		// connection.setAutoCommit(false);
		try {
			// 统计数据入库
			VerifyOrderTotal totalFromDB = verifyOrderTotalMapper.qryByBillDateLimitOne(total.getBillDate());
			if(totalFromDB != null) { 
				totalFromDB.setTradeNum(totalFromDB.getTradeNum() + total.getTradeNum());
				totalFromDB.setTurnover(totalFromDB.getTurnover() + total.getTurnover());
				totalFromDB.setRefundMoney(totalFromDB.getRefundMoney() + total.getRefundMoney());
				totalFromDB.setRedBagsRefundMoney(totalFromDB.getRedBagsRefundMoney() + total.getRedBagsRefundMoney());
				totalFromDB.setPoundage(totalFromDB.getPoundage() + total.getPoundage());
				totalFromDB.setRealityMoney(totalFromDB.getRealityMoney() + total.getRealityMoney());
				verifyOrderTotalMapper.updateByPrimaryKeySelective(totalFromDB);
			}else {
				verifyOrderTotalMapper.insertSelective(total); 
			}
			if(CollectionUtils.isNotEmpty(totalCodeList)) {
				for (VerifyOrderTotalCode verifyOrderTotalCode : totalCodeList) {
					VerifyOrderTotalCode totalCodeFromDB = verifyOrderTotalCodeMapper.findByDateCodeTradeType(verifyOrderTotalCode.getBillDate(), 
							verifyOrderTotalCode.getStoreNo(), verifyOrderTotalCode.getTradeType());
					if(totalCodeFromDB != null) {
						totalCodeFromDB.setTradeNum(totalCodeFromDB.getTradeNum() + verifyOrderTotalCode.getTradeNum());
						totalCodeFromDB.setTurnover(totalCodeFromDB.getTurnover() + verifyOrderTotalCode.getTurnover());
						verifyOrderTotalCodeMapper.updateByPrimaryKeySelective(totalCodeFromDB);
					}else {
						verifyOrderTotalCodeMapper.insertSelective(verifyOrderTotalCode);
					}
				}
			}
			long start = System.currentTimeMillis();
		
			// 对账池入库
			long min = System.currentTimeMillis();
			if(CollectionUtils.isNotEmpty(verifyOrderPoolList)) {
				InputStream dataInputStream = FastBatchInsertUtils.getDataInputStreamForVerifyOrderPool(verifyOrderPoolList);
				FastBatchInsertUtils.bulkLoadFromInputStream(FastBatchInsertSqlConstants.VERIFY_ORDER_POOL_SQL,dataInputStream,connection);
				dataInputStream = FastBatchInsertUtils.getDataInputStreamForVerifyOrderPool(verifyOrderPoolList);
				FastBatchInsertUtils.bulkLoadFromInputStream(FastBatchInsertSqlConstants.VERIFY_ORDER_POOL_HIS_SQL,dataInputStream,connection);
			}
			long end =  System.currentTimeMillis();
			System.out.println("order耗时:"+(min-start)+"ms"+" pool耗时:"+(end-min)+"ms");
			
			//connection.commit();
		} catch (Exception e) {
			logger.info("对账单下载数据错误,日期:" + total != null ? total.getBillDate() : "", e);
			
			// connection.rollback();
			throw e;
		}
	}

 

相关标签: 遇到的问题