欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

C# 大批量插入数据库优化(MySQL、SQL Server、Oracle)

程序员文章站 2022-05-10 22:56:27
...

前言:以下组件皆是从内存直接写入数据库,比起循环建立连接进行Insert效率提升非常明显
1、MySQL,利用MySqlBulkLoader

#region 批量插入数据
///// <summary>
///// 批量插入收集库件级文书档案信息实体(批量)
///// </summary>
///// <param name="dataTable">数据表</param>
///// <returns></returns>
public int BulkCopy(DataTable table)
{
    int insertCount = 0;
    try
    {
        table.TableName = "DataTable";//数据库中的表名

        string connectionString = db.Database.Connection.ConnectionString;

        if (string.IsNullOrEmpty(table.TableName)) throw new Exception("请给DataTable的TableName属性附上表名称");

        if (table.Rows.Count == 0) return 0;

        string tmpPath = Directory.GetCurrentDirectory() + "\\UpTemp";
        if (!Directory.Exists(tmpPath))
            Directory.CreateDirectory(tmpPath);
        tmpPath = Path.Combine(tmpPath, "Temp.csv");//csv文件临时目录

        string csv = DataTableToCsv(table);
        File.WriteAllText(tmpPath, csv);

        var columns = table.Columns.Cast<DataColumn>().Select(_columns => _columns.ColumnName).ToList();

        using (MySqlConnection conn = new MySqlConnection(connectionString))
        {
            try
            {
                Stopwatch stopwatch = new Stopwatch();
                stopwatch.Start();
                conn.Open();
                MySqlBulkLoader bulk = new MySqlBulkLoader(conn)
                {
                    FieldTerminator = ",",
                    FieldQuotationCharacter = '"',
                    EscapeCharacter = '"',
                    LineTerminator = "\r\n",
                    FileName = tmpPath,
                    NumberOfLinesToSkip = 0,
                    TableName = table.TableName,

                };
                bulk.Columns.AddRange(columns);//根据标题列对应插入
                insertCount = bulk.Load();
                stopwatch.Stop();
                //Console.WriteLine("耗时:{0}", stopwatch.ElapsedMilliseconds);
            }
            catch (MySqlException ex)
            {
                throw ex;
            }
        }
        File.Delete(tmpPath);
    }
    catch (Exception ex)
    {
        OnLogError("批量插入收集库件级文书档案信息实体(批量)时异常。", ex);
    }

    
    return insertCount;
}


///将DataTable转换为标准的CSV  
/// </summary>  
/// <param name="table">数据表</param>  
/// <returns>返回标准的CSV</returns>  
private static string DataTableToCsv(DataTable table)
 {
     //以半角逗号(即,)作分隔符,列为空也要表达其存在。  
     //列内容如存在半角逗号(即,)则用半角引号(即"")将该字段值包含起来。  
     //列内容如存在半角引号(即")则应替换成半角双引号("")转义,并用半角引号(即"")将该字段值包含起来。  
     StringBuilder sb = new StringBuilder();
     DataColumn colum;
     foreach (DataRow row in table.Rows)
     {
         for (int i = 0; i<table.Columns.Count; i++)
         {
             colum = table.Columns[i];
             if (i != 0) sb.Append(",");
             if (colum.DataType == typeof(string) && row[colum].ToString().Contains(","))
             {
                 sb.Append("\"" + row[colum].ToString().Replace("\"", "\"\"") + "\"");
             }
             else sb.Append(row[colum].ToString());
         }
         sb.AppendLine();
     }
     return sb.ToString();
 }
#endregion

2、SQL Server,利用SqlBulkCopy

/// <summary>
/// 批量插入
/// </summary>
/// <param name="ConnectStr">数据库连接字符串</param>
/// <param name="dt"></param>
privatestatic void BulkCopy<T>(string ConnectStr, DataTable dt, Action<int> act = null)
{   
    using (SqlConnection conn = new SqlConnection(ConnectStr))
    {
        if (conn.State == ConnectionState.Closed)
            conn.Open();
        using (var sqlbulkcopy = new SqlBulkCopy((SqlConnection)conn, SqlBulkCopyOptions.KeepIdentity, null))
        {
            sqlbulkcopy.BulkCopyTimeout = 60 * 60 * 24;
            sqlbulkcopy.NotifyAfter = 1000;
            sqlbulkcopy.SqlRowsCopied += new SqlRowsCopiedEventHandler((object sender, SqlRowsCopiedEventArgs e) =>
            {
                act?.Invoke(sqlbulkcopy.NotifyAfter);
            });
            sqlbulkcopy.DestinationTableName = dt.TableName;
            for (var i = 0; i < dt.Columns.Count; i++)
            {
                sqlbulkcopy.ColumnMappings.Add(dt.Columns[i].ColumnName, dt.Columns[i].ColumnName);
            }
            sqlbulkcopy.WriteToServer(dt);
        }
    }
}

3、Oracle, 利用OracleBulkCopy

 /// <summary>
 /// 批量插入数据
 /// </summary>
 /// <param name="table">数据表</param>
 /// <param name="targetTableName">数据库目标表名</param>
 /// <returns></returns>
 public static bool BulkCopy(DataTable table, string targetTableName)
 {
     bool result = false;
     using (Oracle.DataAccess.Client.OracleConnection conn = new Oracle.DataAccess.Client.OracleConnection(connStr))
     {
         using (Oracle.DataAccess.Client.OracleBulkCopy bulkCopy = new Oracle.DataAccess.Client.OracleBulkCopy(connStr, Oracle.DataAccess.Client.OracleBulkCopyOptions.Default))
         {
             if (table != null && table.Rows.Count > 0)
             {
                 bulkCopy.DestinationTableName = targetTableName;
                 for (int i = 0; i < table.Columns.Count; i++)
                 {
                     string col = table.Columns[i].ColumnName;
                     bulkCopy.ColumnMappings.Add(col, col);
                 }
                 conn.Open();
                 bulkCopy.WriteToServer(table);
                 result = true;
             }
         }
     }
     return result;
 }
相关标签: 算法