SQL大批量导入数据实例分析
程序员文章站
2022-06-11 13:27:57
...
SQL大批量导入数据实例分析
方案1 multi-insert
insert into test(name,age) values('zhangsan',12),('lisi', 15);
方案2 batch-insert
insert into test(name,age) values(?,?);
测试结果
经测试方案2比方案1快8倍左右
环境单机mysql(同样应该也适用于其他数据库),数据量10万
方案一结果用时:4,125ms
方案二结果用时:33,963ms
部分示例代码
配置信息conf/config.properties
#mysql连接池配置
gp.user=root
gp.password=MTIzNDU2PQ==
gp.jdbcUrl=jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8
gp.driverClass=com.mysql.jdbc.Driver
gp.initialPoolSize=1
gp.minPoolSize=1
gp.maxPoolSize=2
gp.maxIdleTime=60
gp.acquireIncrement=1
gp.acquireRetryAttempts=30
#insert sql
batch.insert.sql=insert into test.ods_core_loan_operation_value_df_bak(stat_dt,modified,created,id,transaction_id,source_type,customer_id,operation_type,schedule_no,installment_made_no,before_value_int,after_value_int,before_value_string,after_value_string) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?);
multi.insert.sql=insert into test.ods_core_loan_operation_value_df_bak(stat_dt,modified,created,id,transaction_id,source_type,customer_id,operation_type,schedule_no,installment_made_no,before_value_int,after_value_int,before_value_string,after_value_string) values
主要逻辑代码
package com.dxm.service;
import com.dxm.common.database.PostgreSqlConnectPool;
import com.dxm.common.property.PropertyUtil;
import org.apache.log4j.Logger;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
/**
* @Project: KafkaToGP
* @File: com.dxm.service.InsertDataGP
* @Author: lijianjun
* @Date: 2019/5/6
* @Description:
*/
public class InsertDataGP {
private static final Logger logger = Logger.getLogger(InsertDataGP.class);
public static List<Map<String, String>> list = new ArrayList<>();
public static synchronized void batchInsertGp() {
String sql = PropertyUtil.getValue("insert.sql");
Connection conn = null;
PreparedStatement ps = null;
int batchSize = 10000;
int count = 0;
try {
conn = PostgreSqlConnectPool.getConnection();
ps = conn.prepareStatement(sql);
// ps.setQueryTimeout(0);
conn.setAutoCommit(false);
for (Map<String, String> map: list) {
ps.setDate(1, new java.sql.Date(System.currentTimeMillis()));
ps.setTimestamp(2, new Timestamp(Long.parseLong(map.get("MODIFIED") + "000")));
ps.setTimestamp(3, new Timestamp(Long.parseLong(map.get("CREATED") + "000")));
ps.setLong(4, Long.parseLong(map.get("ID")));
ps.setLong(5, Long.parseLong(map.get("TRANSACTION_ID")));
ps.setInt(6, Integer.parseInt(map.get("SOURCE_TYPE")));
ps.setLong(7, Long.parseLong(map.get("CUSTOMER_ID")));
ps.setInt(8, Integer.parseInt(map.get("OPERATION_TYPE")));
ps.setInt(9, Integer.parseInt(map.get("SCHEDULE_NO")));
ps.setInt(10, Integer.parseInt(map.get("INSTALLMENT_MADE_NO")));
ps.setLong(11, Long.parseLong(map.get("BEFORE_VALUE_INT")));
ps.setLong(12, Long.parseLong(map.get("AFTER_VALUE_INT")));
ps.setString(13, map.get("BEFORE_VALUE_STRING"));
ps.setString(14, map.get("AFTER_VALUE_STRING"));
ps.addBatch();
count++;
if (count % batchSize == 0) {
ps.executeBatch();
conn.commit();
count = 0;
}
}
if (count % batchSize !=0 ) {
ps.executeBatch();
conn.commit();
}
// System.out.println(list);
list.clear();
} catch (SQLException e) {
logger.error(e + "\nThe cause : " +e.getNextException());
System.exit(1);
} finally {
PostgreSqlConnectPool.release(conn, ps, null);
}
}
public static synchronized void multiInsertGp() {
StringBuffer sql = new StringBuffer(PropertyUtil.getValue("multi.insert.sql"));
Connection conn = null;
Statement stat = null;
int batchSize = 10000;
int count = 0;
try {
conn = PostgreSqlConnectPool.getConnection();
stat = conn.createStatement();
conn.setAutoCommit(false);
for (Map<String, String> map: list) {
sql.append("(");
sql.append("'");
sql.append(new java.sql.Date(System.currentTimeMillis()));
sql.append("',");
sql.append("'");
sql.append(new Timestamp(Long.parseLong(map.get("MODIFIED") + "000")));
sql.append("',");
sql.append("'");
sql.append(new Timestamp(Long.parseLong(map.get("CREATED") + "000")));
sql.append("',");
sql.append(Long.parseLong(map.get("ID")));
sql.append(",");
sql.append(Long.parseLong(map.get("TRANSACTION_ID")));
sql.append(",");
sql.append(Integer.parseInt(map.get("SOURCE_TYPE")));
sql.append(",");
sql.append(Long.parseLong(map.get("CUSTOMER_ID")));
sql.append(",");
sql.append(Integer.parseInt(map.get("OPERATION_TYPE")));
sql.append(",");
sql.append(Integer.parseInt(map.get("SCHEDULE_NO")));
sql.append(",");
sql.append(Integer.parseInt(map.get("INSTALLMENT_MADE_NO")));
sql.append(",");
sql.append(Long.parseLong(map.get("BEFORE_VALUE_INT")));
sql.append(",");
sql.append(Long.parseLong(map.get("AFTER_VALUE_INT")));
sql.append(",");
sql.append("'");
sql.append(map.get("BEFORE_VALUE_STRING"));
sql.append("',");
sql.append("'");
sql.append(map.get("AFTER_VALUE_STRING"));
sql.append("'");
sql.append("),");
count++;
if (count % batchSize == 0) {
stat.execute(sql.deleteCharAt(sql.length() - 1).toString());
conn.commit();
count = 0;
sql = new StringBuffer(PropertyUtil.getValue("multi.insert.sql"));
}
}
if (count % batchSize !=0 ) {
stat.execute(sql.deleteCharAt(sql.length() - 1).toString());
conn.commit();
}
list.clear();
} catch (SQLException e) {
logger.error(e + "\nThe cause : " +e.getNextException());
System.exit(1);
} finally {
PostgreSqlConnectPool.release(conn, stat, null);
}
}
private static String getDate() {
long time = System.currentTimeMillis();
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
String retTime = simpleDateFormat.format(new Date(time));
return retTime;
}
private static String getTime(String ts) {
SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String retTime = simpleDateFormat.format(new Date(Long.valueOf(ts+"000")));
return retTime;
}
public static void main(String[] args) {
System.out.println(getTime("1557127861"));
System.out.println(new java.sql.Date(System.currentTimeMillis()));
System.out.println(Long.parseLong(""));
}
}
连接池
package com.dxm.common.database;
import com.dxm.common.property.PropertyUtil;
import com.dxm.common.security.BASE64Util;
import com.mchange.v2.c3p0.ComboPooledDataSource;
import org.apache.log4j.Logger;
import java.beans.PropertyVetoException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
/**
* @Project: KafkaToGP
* @File: com.dxm.common.database.PostgreSqlConnectPool
* @Author: lijianjun
* @Date: 2019/1/29
* @Description:
*/
public final class PostgreSqlConnectPool {
private static final Logger logger = Logger.getLogger(PostgreSqlConnectPool.class);
private static ComboPooledDataSource dataSource;
static {
try {
dataSource = new ComboPooledDataSource();
dataSource.setUser(PropertyUtil.getValue("gp.user"));
dataSource.setPassword(BASE64Util.getInstance().decode(PropertyUtil.getValue("gp.password")));
dataSource.setJdbcUrl(PropertyUtil.getValue("gp.jdbcUrl"));
dataSource.setDriverClass(PropertyUtil.getValue("gp.driverClass"));
// 初始化时获取连接数,取值应在minPoolSize与maxPoolSize之间。Default: 3
dataSource.setInitialPoolSize(Integer.parseInt(PropertyUtil.getValue("gp.initialPoolSize")));
// 连接池中保留的最小连接数
dataSource.setMinPoolSize(Integer.parseInt(PropertyUtil.getValue("gp.minPoolSize")));
// 连接池中保留的最大连接数。Default: 15
dataSource.setMaxPoolSize(Integer.parseInt(PropertyUtil.getValue("gp.maxPoolSize")));
// 最大空闲时间,60秒内未使用则连接被丢弃。若为0则永不丢弃。Default: 0
dataSource.setMaxIdleTime(Integer.parseInt(PropertyUtil.getValue("gp.maxIdleTime")));
// 当连接池中的连接耗尽的时候c3p0一次同时获取的连接数。Default: 3
dataSource.setAcquireIncrement(Integer.parseInt(PropertyUtil.getValue("gp.acquireIncrement")));
// 定义在从数据库获取新连接失败后重复尝试的次数。Default: 30
dataSource.setAcquireRetryAttempts(Integer.parseInt(PropertyUtil.getValue("gp.acquireRetryAttempts")));
} catch (PropertyVetoException e) {
logger.error(e);
}
}
// 获得连接不必加锁,用完释放即可
public static Connection getConnection() {
Connection conn = null;
try {
conn = dataSource.getConnection();
} catch (SQLException e) {
logger.error(e);
System.exit(1);
}
return conn;
}
public static void release(Connection conn, Statement st, ResultSet rs) {
try {
if (null != rs) {
rs.close();
}
if (null != st) {
st.close();
}
if (null != conn) {
// 将Connection连接对象还给数据库连接池
conn.close();
}
} catch (SQLException e) {
logger.error(e);
System.exit(1);
}
}
/**
* 关闭连接
*/
public static void close() {
if (null != dataSource) {
dataSource.close();
}
}
}
工具类
package com.dxm.common.property;
import com.dxm.common.log.Log4jUtil;
import org.apache.log4j.Logger;
import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.util.Properties;
/**
* @Project: KafkaToGP
* @File: com.dxm.com.dxm.common.property.PropertyUtil.java
* @Author: lijianjun
* @Date: 2019/1/29
* @Description:
*/
public class PropertyUtil {
private static final Logger logger = Logger.getLogger(PropertyUtil.class);
public static String getValue(String key) {
InputStream inputStream = null;
String value = null;
try {
inputStream = new BufferedInputStream(
new FileInputStream(new File("conf/config.properties")));
Properties prop = new Properties();
prop.load(new InputStreamReader(inputStream, "UTF-8"));
value = prop.getProperty(key);
} catch (FileNotFoundException e) {
logger.error(e.toString());
System.exit(1);
} catch (UnsupportedEncodingException e) {
logger.error(e.toString());
System.exit(1);
} catch (IOException e) {
logger.error(e.toString());
System.exit(1);
} finally {
if (inputStream != null) {
try {
inputStream.close();
} catch (IOException e) {
logger.error(e);
}
}
}
return value;
}
}
下一篇: 使用CSV文件批量导入数据库