mysql大批量数据插入优化
程序员文章站
2022-06-15 13:39:43
...
目前在项目中发现一张700万的表,插入10万条数据通过Mybatis的批量插入大概需要3分钟左右,耗时太长。现在通过mysql的LOAD DATA LOCAL INFILE 命令进行优化插入。最后测试10万条数据批量插入大概在3秒左右的时间。
定义命令执行语句
public interface FastBatchInsertSqlConstants {
String VERIFY_ORDER_POOL_HIS_SQL = "LOAD DATA LOCAL INFILE 'sql.csv' IGNORE into TABLE t_verify_order_pool_his (mch_order_no,store_no,total_money,refund_money,pay_time,trade_type, bill_date, create_date,md5String)";
String ORDER_POOL_SQL = "LOAD DATA LOCAL INFILE 'sql.csv' IGNORE into TABLE t_order_pool (out_trade_no,classdate,oli_station_code,ttc,amount,total_fee, takedate, pay_time,pay_platform,create_date)";
String MODIFY_AND_DELETE_SQL = "LOAD DATA LOCAL INFILE 'sql.csv' IGNORE into TABLE t_modify_and_delete (table_pk,table_name,data,type,create_date,update_date)";
}
把对象转换成对应格式的文件流,并通过com.mysql.jdbc.PreparedStatement去执行命令
public class FastBatchInsertUtils {
public static int bulkLoadFromInputStream(String loadDataSql, InputStream dataStream, Connection conn)
throws SQLException {
if (dataStream == null) {
return 0;
}
PreparedStatement statement = conn.prepareStatement(loadDataSql);
int result = 0;
if (statement.isWrapperFor(com.mysql.jdbc.Statement.class)) {
com.mysql.jdbc.PreparedStatement mysqlStatement = statement.unwrap(com.mysql.jdbc.PreparedStatement.class);
mysqlStatement.setLocalInfileInputStream(dataStream);
result = mysqlStatement.executeUpdate();
}
return result;
}
public static InputStream getDataInputStreamForVerifyOrderPool(List<VerifyOrderPool> verifyOrderPoolList) {
StringBuilder builder = new StringBuilder();
for (VerifyOrderPool verifyOrderPool : verifyOrderPoolList) {
StringBuilder md5sb = new StringBuilder();
builder.append(verifyOrderPool.getMchOrderNo());
md5sb.append(verifyOrderPool.getMchOrderNo());
builder.append("\t");
builder.append(verifyOrderPool.getStoreNo());
md5sb.append(verifyOrderPool.getStoreNo());
builder.append("\t");
builder.append(verifyOrderPool.getTotalMoney());
md5sb.append(verifyOrderPool.getTotalMoney() != null ? verifyOrderPool.getTotalMoney() : 0);
builder.append("\t");
builder.append(verifyOrderPool.getRefundMoney());
md5sb.append(verifyOrderPool.getRefundMoney() != null ? verifyOrderPool.getRefundMoney() : 0);
builder.append("\t");
builder.append(DateUtils.formatDate(
verifyOrderPool.getPayTime() != null ? new Date() : verifyOrderPool.getPayTime(),
"yyyy-MM-dd HH:mm:ss"));
builder.append("\t");
builder.append(verifyOrderPool.getTradeType());
md5sb.append(verifyOrderPool.getTradeType());
builder.append("\t");
builder.append(verifyOrderPool.getBillDate());
md5sb.append(verifyOrderPool.getBillDate());
builder.append("\t");
builder.append(DateUtils.formatDate(
verifyOrderPool.getCreateDate() == null ? new Date() : verifyOrderPool.getCreateDate(),
"yyyy-MM-dd HH:mm:ss"));
builder.append("\t");
builder.append(MD5Util.md5s(md5sb.toString())).append("\n");
}
byte[] bytes = builder.toString().getBytes();
InputStream is = new ByteArrayInputStream(bytes);
return is;
}
public static InputStream getDataInputStreamForOrderPool(List<OrderPool> orderPoolList) {
StringBuilder builder = new StringBuilder();
for (OrderPool orderPool : orderPoolList) {
builder.append(orderPool.getOutTradeNo());
builder.append("\t");
builder.append(orderPool.getClassdate());
builder.append("\t");
builder.append(orderPool.getOliStationCode());
builder.append("\t");
builder.append(orderPool.getTtc());
builder.append("\t");
builder.append(orderPool.getAmount());
builder.append("\t");
builder.append(orderPool.getTotalFee());
builder.append("\t");
builder.append(DateUtils.formatDate(orderPool.getTakedate() == null ? new Date() : orderPool.getTakedate(),
"yyyy-MM-dd HH:mm:ss"));
builder.append("\t");
builder.append(DateUtils.formatDate(orderPool.getPayTime() == null ? new Date() : orderPool.getPayTime(),
"yyyy-MM-dd HH:mm:ss"));
builder.append("\t");
builder.append(orderPool.getPayPlatform());
builder.append("\t");
builder.append(DateUtils.formatDate(
orderPool.getCreateDate() == null ? new Date() : orderPool.getCreateDate(), "yyyy-MM-dd HH:mm:ss"));
builder.append("\t");
builder.append("\n");
}
byte[] bytes = builder.toString().getBytes();
InputStream is = new ByteArrayInputStream(bytes);
return is;
}
public static InputStream getDataInputStreamForModifyAndDelete(List<ModifyAndDelete> modifyAndDeleteList) {
StringBuilder builder = new StringBuilder();
for (ModifyAndDelete modifyAndDelete : modifyAndDeleteList) {
builder.append(modifyAndDelete.getTablePk());
builder.append("\t");
builder.append(modifyAndDelete.getTableName());
builder.append("\t");
builder.append(modifyAndDelete.getData());
builder.append("\t");
builder.append(modifyAndDelete.getType());
builder.append("\t");
builder.append(DateUtils.formatDate(
modifyAndDelete.getCreateDate() == null ? new Date() : modifyAndDelete.getCreateDate(), "yyyy-MM-dd HH:mm:ss"));
builder.append("\t");
builder.append(DateUtils.formatDate(
modifyAndDelete.getUpdateDate() == null ? new Date() : modifyAndDelete.getUpdateDate(), "yyyy-MM-dd HH:mm:ss"));
builder.append("\t");
builder.append("\n");
}
byte[] bytes = builder.toString().getBytes();
InputStream is = new ByteArrayInputStream(bytes);
return is;
}
在项目里进行调用,注意,如果要和其他更新数据库的操作在同一事务中,应该使用带事务的spring上下文中的数据库连接
@Transactional
public void newInsertBillData(WftDownRespVo result) throws Exception{
VerifyOrderTotal total = result.getTotal();
List<VerifyOrderPool> verifyOrderPoolList = result.getVerifyOrderPoolList();
Map<String,List<VerifyOrderPool>> map = verifyOrderPoolList.stream().collect(Collectors.groupingBy(d -> fetchCodeGroupKey(d)));
Date now = new Date();
List<VerifyOrderTotalCode> totalCodeList = new ArrayList<VerifyOrderTotalCode>();
for(Map.Entry<String, List<VerifyOrderPool>> entry : map.entrySet()) {
String key = entry.getKey();
String[] props = key.split(":");
VerifyOrderTotalCode totalCode = new VerifyOrderTotalCode();
totalCode.setBillDate(props[0]);
totalCode.setStoreNo(props[1]);
totalCode.setTradeType(props[2]);
totalCode.setTradeNum(entry.getValue().size());
totalCode.setTurnover(sumTotalFee(entry.getValue()));
totalCode.setCreateDate(now);
totalCode.setUpdateDate(now);
totalCodeList.add(totalCode);
}
DataSourceTransactionManager transactionManager = SpringContextHolder
.getBeanByClass(DataSourceTransactionManager.class);
ConnectionHolder connectionHolder = (ConnectionHolder) TransactionSynchronizationManager
.getResource(transactionManager.getDataSource());// 返回事务对象
// 从spring中获取事务连接,让verifyOrderTotalMapper.insertSelective(total)与下面的LOAD DATA LOCAL INFILE操作是在同一个事务中
Connection connection = connectionHolder.getConnection();
// 注意不能用下面的获取Connection的方法,因为此方法获取到的连接是新的连接,并不是线程上下文中的连接
// 如果用此连接的话那和verifyOrderTotalMapper.insertSelective(total)的连接就不在同一个事务中,那事务管理就不是spring管理
// 需要自己手动进行事务管理
// Connection connection= transactionManager.getDataSource().getConnection();
// connection.setAutoCommit(false);
try {
// 统计数据入库
VerifyOrderTotal totalFromDB = verifyOrderTotalMapper.qryByBillDateLimitOne(total.getBillDate());
if(totalFromDB != null) {
totalFromDB.setTradeNum(totalFromDB.getTradeNum() + total.getTradeNum());
totalFromDB.setTurnover(totalFromDB.getTurnover() + total.getTurnover());
totalFromDB.setRefundMoney(totalFromDB.getRefundMoney() + total.getRefundMoney());
totalFromDB.setRedBagsRefundMoney(totalFromDB.getRedBagsRefundMoney() + total.getRedBagsRefundMoney());
totalFromDB.setPoundage(totalFromDB.getPoundage() + total.getPoundage());
totalFromDB.setRealityMoney(totalFromDB.getRealityMoney() + total.getRealityMoney());
verifyOrderTotalMapper.updateByPrimaryKeySelective(totalFromDB);
}else {
verifyOrderTotalMapper.insertSelective(total);
}
if(CollectionUtils.isNotEmpty(totalCodeList)) {
for (VerifyOrderTotalCode verifyOrderTotalCode : totalCodeList) {
VerifyOrderTotalCode totalCodeFromDB = verifyOrderTotalCodeMapper.findByDateCodeTradeType(verifyOrderTotalCode.getBillDate(),
verifyOrderTotalCode.getStoreNo(), verifyOrderTotalCode.getTradeType());
if(totalCodeFromDB != null) {
totalCodeFromDB.setTradeNum(totalCodeFromDB.getTradeNum() + verifyOrderTotalCode.getTradeNum());
totalCodeFromDB.setTurnover(totalCodeFromDB.getTurnover() + verifyOrderTotalCode.getTurnover());
verifyOrderTotalCodeMapper.updateByPrimaryKeySelective(totalCodeFromDB);
}else {
verifyOrderTotalCodeMapper.insertSelective(verifyOrderTotalCode);
}
}
}
long start = System.currentTimeMillis();
// 对账池入库
long min = System.currentTimeMillis();
if(CollectionUtils.isNotEmpty(verifyOrderPoolList)) {
InputStream dataInputStream = FastBatchInsertUtils.getDataInputStreamForVerifyOrderPool(verifyOrderPoolList);
FastBatchInsertUtils.bulkLoadFromInputStream(FastBatchInsertSqlConstants.VERIFY_ORDER_POOL_SQL,dataInputStream,connection);
dataInputStream = FastBatchInsertUtils.getDataInputStreamForVerifyOrderPool(verifyOrderPoolList);
FastBatchInsertUtils.bulkLoadFromInputStream(FastBatchInsertSqlConstants.VERIFY_ORDER_POOL_HIS_SQL,dataInputStream,connection);
}
long end = System.currentTimeMillis();
System.out.println("order耗时:"+(min-start)+"ms"+" pool耗时:"+(end-min)+"ms");
//connection.commit();
} catch (Exception e) {
logger.info("对账单下载数据错误,日期:" + total != null ? total.getBillDate() : "", e);
// connection.rollback();
throw e;
}
}