欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

SQL大批量导入数据实例分析

程序员文章站 2022-06-11 13:27:57
...

方案1 multi-insert

insert into test(name,age) values('zhangsan',12),('lisi', 15);

方案2 batch-insert

insert into test(name,age) values(?,?);

测试结果

经测试方案2比方案1快8倍左右
环境单机mysql(同样应该也适用于其他数据库),数据量10万
方案一结果用时:4,125ms
方案二结果用时:33,963ms

部分示例代码

配置信息conf/config.properties
#mysql连接池配置
gp.user=root
gp.password=MTIzNDU2PQ==
gp.jdbcUrl=jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8
gp.driverClass=com.mysql.jdbc.Driver
gp.initialPoolSize=1
gp.minPoolSize=1
gp.maxPoolSize=2
gp.maxIdleTime=60
gp.acquireIncrement=1
gp.acquireRetryAttempts=30

#insert sql
batch.insert.sql=insert into test.ods_core_loan_operation_value_df_bak(stat_dt,modified,created,id,transaction_id,source_type,customer_id,operation_type,schedule_no,installment_made_no,before_value_int,after_value_int,before_value_string,after_value_string) values(?,?,?,?,?,?,?,?,?,?,?,?,?,?);
multi.insert.sql=insert into test.ods_core_loan_operation_value_df_bak(stat_dt,modified,created,id,transaction_id,source_type,customer_id,operation_type,schedule_no,installment_made_no,before_value_int,after_value_int,before_value_string,after_value_string) values
主要逻辑代码
package com.dxm.service;

import com.dxm.common.database.PostgreSqlConnectPool;
import com.dxm.common.property.PropertyUtil;
import org.apache.log4j.Logger;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;

/**
 * @Project: KafkaToGP
 * @File: com.dxm.service.InsertDataGP
 * @Author: lijianjun
 * @Date: 2019/5/6
 * @Description:
 */
public class InsertDataGP {
    private static final Logger logger = Logger.getLogger(InsertDataGP.class);
    public static  List<Map<String, String>> list = new ArrayList<>();

    public static synchronized void batchInsertGp() {
        String sql = PropertyUtil.getValue("insert.sql");
        Connection conn = null;
        PreparedStatement ps = null;
        int batchSize = 10000;
        int count = 0;

        try {
            conn = PostgreSqlConnectPool.getConnection();
            ps = conn.prepareStatement(sql);
            // ps.setQueryTimeout(0);
            conn.setAutoCommit(false);
            for (Map<String, String> map: list) {
                ps.setDate(1, new java.sql.Date(System.currentTimeMillis()));
                ps.setTimestamp(2, new Timestamp(Long.parseLong(map.get("MODIFIED") + "000")));
                ps.setTimestamp(3, new Timestamp(Long.parseLong(map.get("CREATED") + "000")));
                ps.setLong(4, Long.parseLong(map.get("ID")));
                ps.setLong(5, Long.parseLong(map.get("TRANSACTION_ID")));
                ps.setInt(6, Integer.parseInt(map.get("SOURCE_TYPE")));
                ps.setLong(7, Long.parseLong(map.get("CUSTOMER_ID")));
                ps.setInt(8, Integer.parseInt(map.get("OPERATION_TYPE")));
                ps.setInt(9, Integer.parseInt(map.get("SCHEDULE_NO")));
                ps.setInt(10, Integer.parseInt(map.get("INSTALLMENT_MADE_NO")));
                ps.setLong(11, Long.parseLong(map.get("BEFORE_VALUE_INT")));
                ps.setLong(12, Long.parseLong(map.get("AFTER_VALUE_INT")));
                ps.setString(13, map.get("BEFORE_VALUE_STRING"));
                ps.setString(14, map.get("AFTER_VALUE_STRING"));
                ps.addBatch();
                count++;
                if (count % batchSize == 0) {
                    ps.executeBatch();
                    conn.commit();
                    count = 0;
                }
            }
            if (count % batchSize !=0 ) {
                ps.executeBatch();
                conn.commit();
            }
            // System.out.println(list);
            list.clear();
        } catch (SQLException e) {
            logger.error(e + "\nThe cause : " +e.getNextException());
            System.exit(1);
        } finally {
            PostgreSqlConnectPool.release(conn, ps, null);
        }
    }

    public static synchronized void multiInsertGp() {
        StringBuffer sql = new StringBuffer(PropertyUtil.getValue("multi.insert.sql"));
        Connection conn = null;
        Statement stat = null;
        int batchSize = 10000;
        int count = 0;

        try {
            conn = PostgreSqlConnectPool.getConnection();
            stat = conn.createStatement();
            conn.setAutoCommit(false);
            for (Map<String, String> map: list) {
                sql.append("(");
                sql.append("'");
                sql.append(new java.sql.Date(System.currentTimeMillis()));
                sql.append("',");
                sql.append("'");
                sql.append(new Timestamp(Long.parseLong(map.get("MODIFIED") + "000")));
                sql.append("',");
                sql.append("'");
                sql.append(new Timestamp(Long.parseLong(map.get("CREATED") + "000")));
                sql.append("',");
                sql.append(Long.parseLong(map.get("ID")));
                sql.append(",");
                sql.append(Long.parseLong(map.get("TRANSACTION_ID")));
                sql.append(",");
                sql.append(Integer.parseInt(map.get("SOURCE_TYPE")));
                sql.append(",");
                sql.append(Long.parseLong(map.get("CUSTOMER_ID")));
                sql.append(",");
                sql.append(Integer.parseInt(map.get("OPERATION_TYPE")));
                sql.append(",");
                sql.append(Integer.parseInt(map.get("SCHEDULE_NO")));
                sql.append(",");
                sql.append(Integer.parseInt(map.get("INSTALLMENT_MADE_NO")));
                sql.append(",");
                sql.append(Long.parseLong(map.get("BEFORE_VALUE_INT")));
                sql.append(",");
                sql.append(Long.parseLong(map.get("AFTER_VALUE_INT")));
                sql.append(",");
                sql.append("'");
                sql.append(map.get("BEFORE_VALUE_STRING"));
                sql.append("',");
                sql.append("'");
                sql.append(map.get("AFTER_VALUE_STRING"));
                sql.append("'");
                sql.append("),");
                count++;
                if (count % batchSize == 0) {
                    stat.execute(sql.deleteCharAt(sql.length() - 1).toString());
                    conn.commit();
                    count = 0;
                    sql = new StringBuffer(PropertyUtil.getValue("multi.insert.sql"));
                }
            }
            if (count % batchSize !=0 ) {
                stat.execute(sql.deleteCharAt(sql.length() - 1).toString());
                conn.commit();
            }
            list.clear();
        } catch (SQLException e) {
            logger.error(e + "\nThe cause : " +e.getNextException());
            System.exit(1);
        } finally {
            PostgreSqlConnectPool.release(conn, stat, null);
        }
    }

    private static  String getDate() {
        long time = System.currentTimeMillis();
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd");
        String retTime =  simpleDateFormat.format(new Date(time));
        return retTime;
    }

    private static String getTime(String ts) {
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        String retTime =  simpleDateFormat.format(new Date(Long.valueOf(ts+"000")));
        return retTime;
    }
    public static void main(String[] args) {
        System.out.println(getTime("1557127861"));
        System.out.println(new java.sql.Date(System.currentTimeMillis()));
        System.out.println(Long.parseLong(""));
    }
}

连接池
package com.dxm.common.database;

import com.dxm.common.property.PropertyUtil;
import com.dxm.common.security.BASE64Util;
import com.mchange.v2.c3p0.ComboPooledDataSource;
import org.apache.log4j.Logger;
import java.beans.PropertyVetoException;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;

/**
 * @Project: KafkaToGP
 * @File: com.dxm.common.database.PostgreSqlConnectPool
 * @Author: lijianjun
 * @Date: 2019/1/29
 * @Description:
 */
public final class PostgreSqlConnectPool {
    private static final Logger logger = Logger.getLogger(PostgreSqlConnectPool.class);
    private static ComboPooledDataSource dataSource;

    static {
        try {
            dataSource = new ComboPooledDataSource();
            dataSource.setUser(PropertyUtil.getValue("gp.user"));
            dataSource.setPassword(BASE64Util.getInstance().decode(PropertyUtil.getValue("gp.password")));
            dataSource.setJdbcUrl(PropertyUtil.getValue("gp.jdbcUrl"));
            dataSource.setDriverClass(PropertyUtil.getValue("gp.driverClass"));
            // 初始化时获取连接数,取值应在minPoolSize与maxPoolSize之间。Default: 3
            dataSource.setInitialPoolSize(Integer.parseInt(PropertyUtil.getValue("gp.initialPoolSize")));
            // 连接池中保留的最小连接数
            dataSource.setMinPoolSize(Integer.parseInt(PropertyUtil.getValue("gp.minPoolSize")));
            // 连接池中保留的最大连接数。Default: 15
            dataSource.setMaxPoolSize(Integer.parseInt(PropertyUtil.getValue("gp.maxPoolSize")));
            // 最大空闲时间,60秒内未使用则连接被丢弃。若为0则永不丢弃。Default: 0
            dataSource.setMaxIdleTime(Integer.parseInt(PropertyUtil.getValue("gp.maxIdleTime")));
            // 当连接池中的连接耗尽的时候c3p0一次同时获取的连接数。Default: 3
            dataSource.setAcquireIncrement(Integer.parseInt(PropertyUtil.getValue("gp.acquireIncrement")));
            // 定义在从数据库获取新连接失败后重复尝试的次数。Default: 30
            dataSource.setAcquireRetryAttempts(Integer.parseInt(PropertyUtil.getValue("gp.acquireRetryAttempts")));
        } catch (PropertyVetoException e) {
            logger.error(e);
        }
    }

    // 获得连接不必加锁,用完释放即可
    public static Connection getConnection() {
        Connection conn = null;
        try {
            conn = dataSource.getConnection();
        } catch (SQLException e) {
            logger.error(e);
            System.exit(1);
        }
        return conn;
    }

    public static void release(Connection conn, Statement st, ResultSet rs) {
        try {
            if (null != rs) {
                rs.close();
            }
            if (null != st) {
                st.close();
            }
            if (null != conn) {
                // 将Connection连接对象还给数据库连接池
                conn.close();
            }
        } catch (SQLException e) {
            logger.error(e);
            System.exit(1);
        }
    }

    /**
     * 关闭连接
     */
    public static void close() {
        if (null != dataSource) {
            dataSource.close();
        }
    }
}

工具类
package com.dxm.common.property;

import com.dxm.common.log.Log4jUtil;
import org.apache.log4j.Logger;

import java.io.BufferedInputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.UnsupportedEncodingException;
import java.util.Properties;

/**
 * @Project: KafkaToGP
 * @File: com.dxm.com.dxm.common.property.PropertyUtil.java
 * @Author: lijianjun
 * @Date: 2019/1/29
 * @Description:
 */
public class PropertyUtil {

    private static final Logger logger = Logger.getLogger(PropertyUtil.class);

    public static String getValue(String key) {
        InputStream inputStream = null;
        String value = null;
        try {
            inputStream = new BufferedInputStream(
                    new FileInputStream(new File("conf/config.properties")));
            Properties prop = new Properties();
            prop.load(new InputStreamReader(inputStream, "UTF-8"));
            value = prop.getProperty(key);
        } catch (FileNotFoundException e) {
            logger.error(e.toString());
            System.exit(1);
        } catch (UnsupportedEncodingException e) {
            logger.error(e.toString());
            System.exit(1);
        } catch (IOException e) {
            logger.error(e.toString());
            System.exit(1);
        } finally {
            if (inputStream != null) {
                try {
                    inputStream.close();
                } catch (IOException e) {
                    logger.error(e);
                }
            }
        }
        return value;
    }
}