c#几种数据库的大数据批量插入(SqlServer、Oracle、SQLite和MySql)
在之前只知道sqlserver支持数据批量插入,殊不知道oracle、sqlite和mysql也是支持的,不过oracle需要使用orace.dataaccess驱动,今天就贴出几种数据库的批量插入解决方法。
首先说一下,iprovider里有一个用于实现批量插入的插件服务接口ibatcherprovider,此接口在前一篇文章中已经提到过了。
/// <summary> /// 提供数据批量处理的方法。 /// </summary> public interface ibatcherprovider : iproviderservice { /// <summary> /// 将 <see cref="datatable"/> 的数据批量插入到数据库中。 /// </summary> /// <param name="datatable">要批量插入的 <see cref="datatable"/>。</param> /// <param name="batchsize">每批次写入的数据量。</param> void insert(datatable datatable, int batchsize = 10000); }
一、sqlserver数据批量插入
sqlserver的批量插入很简单,使用sqlbulkcopy就可以,以下是该类的实现:
/// <summary> /// 为 system.data.sqlclient 提供的用于批量操作的方法。 /// </summary> public sealed class mssqlbatcher : ibatcherprovider { /// <summary> /// 获取或设置提供者服务的上下文。 /// </summary> public servicecontext servicecontext { get; set; } /// <summary> /// 将 <see cref="datatable"/> 的数据批量插入到数据库中。 /// </summary> /// <param name="datatable">要批量插入的 <see cref="datatable"/>。</param> /// <param name="batchsize">每批次写入的数据量。</param> public void insert(datatable datatable, int batchsize = 10000) { checker.argumentnull(datatable, "datatable"); if (datatable.rows.count == 0) { return; } using (var connection = (sqlconnection)servicecontext.database.createconnection()) { try { connection.tryopen(); //给表名加上前后导符 var tablename = dbutility.formatbyquote(servicecontext.database.provider.getservice<isyntaxprovider>(), datatable.tablename); using (var bulk = new sqlbulkcopy(connection, sqlbulkcopyoptions.keepidentity, null) { destinationtablename = tablename, batchsize = batchsize }) { //循环所有列,为bulk添加映射 datatable.eachcolumn(c => bulk.columnmappings.add(c.columnname, c.columnname), c => !c.autoincrement); bulk.writetoserver(datatable); bulk.close(); } } catch (exception exp) { throw new batcherexception(exp); } finally { connection.tryclose(); } } } }
以上没有使用事务,使用事务在性能上会有一定的影响,如果要使用事务,可以设置sqlbulkcopyoptions.useinternaltransaction。
二、oracle数据批量插入
system.data.oracleclient不支持批量插入,因此只能使用oracle.dataaccess组件来作为提供者。
/// <summary> /// oracle.data.access 组件提供的用于批量操作的方法。 /// </summary> public sealed class oracleaccessbatcher : ibatcherprovider { /// <summary> /// 获取或设置提供者服务的上下文。 /// </summary> public servicecontext servicecontext { get; set; } /// <summary> /// 将 <see cref="datatable"/> 的数据批量插入到数据库中。 /// </summary> /// <param name="datatable">要批量插入的 <see cref="datatable"/>。</param> /// <param name="batchsize">每批次写入的数据量。</param> public void insert(datatable datatable, int batchsize = 10000) { checker.argumentnull(datatable, "datatable"); if (datatable.rows.count == 0) { return; } using (var connection = servicecontext.database.createconnection()) { try { connection.tryopen(); using (var command = servicecontext.database.provider.dbproviderfactory.createcommand()) { if (command == null) { throw new batcherexception(new argumentexception("command")); } command.connection = connection; command.commandtext = generateinsersql(servicecontext.database, command, datatable); command.executenonquery(); } } catch (exception exp) { throw new batcherexception(exp); } finally { connection.tryclose(); } } } /// <summary> /// 生成插入数据的sql语句。 /// </summary> /// <param name="database"></param> /// <param name="command"></param> /// <param name="table"></param> /// <returns></returns> private string generateinsersql(idatabase database, dbcommand command, datatable table) { var names = new stringbuilder(); var values = new stringbuilder(); //将一个datatable的数据转换为数组的数组 var data = table.toarray(); //设置arraybindcount属性 command.gettype().getproperty("arraybindcount").setvalue(command, table.rows.count, null); var syntax = database.provider.getservice<isyntaxprovider>(); for (var i = 0; i < table.columns.count; i++) { var column = table.columns[i]; var parameter = database.provider.dbproviderfactory.createparameter(); if (parameter == null) { continue; } parameter.parametername = column.columnname; parameter.direction = parameterdirection.input; parameter.dbtype = column.datatype.getdbtype(); parameter.value = data[i]; if (names.length > 0) { names.append(","); values.append(","); } names.appendformat("{0}", dbutility.formatbyquote(syntax, column.columnname)); values.appendformat("{0}{1}", syntax.parameterprefix, column.columnname); command.parameters.add(parameter); } return string.format("insert into {0}({1}) values ({2})", dbutility.formatbyquote(syntax, table.tablename), names, values); } }
以上最重要的一步,就是将datatable转为数组的数组表示,即object[][],前数组的上标是列的个数,后数组是行的个数,因此循环columns将后数组作为parameter的值,也就是说,参数的值是一个数组。而insert语句与一般的插入语句没有什么不一样。
三、sqlite数据批量插入
sqlite的批量插入只需开启事务就可以了,这个具体的原理不得而知。
public sealed class sqlitebatcher : ibatcherprovider { /// <summary> /// 获取或设置提供者服务的上下文。 /// </summary> public servicecontext servicecontext { get; set; } /// <summary> /// 将 <see cref="datatable"/> 的数据批量插入到数据库中。 /// </summary> /// <param name="datatable">要批量插入的 <see cref="datatable"/>。</param> /// <param name="batchsize">每批次写入的数据量。</param> public void insert(datatable datatable, int batchsize = 10000) { checker.argumentnull(datatable, "datatable"); if (datatable.rows.count == 0) { return; } using (var connection = servicecontext.database.createconnection()) { dbtransaction transcation = null; try { connection.tryopen(); transcation = connection.begintransaction(); using (var command = servicecontext.database.provider.dbproviderfactory.createcommand()) { if (command == null) { throw new batcherexception(new argumentexception("command")); } command.connection = connection; command.commandtext = generateinsersql(servicecontext.database, datatable); if (command.commandtext == string.empty) { return; } var flag = new assertflag(); datatable.eachrow(row => { var first = flag.asserttrue(); processcommandparameters(datatable, command, row, first); command.executenonquery(); }); } transcation.commit(); } catch (exception exp) { if (transcation != null) { transcation.rollback(); } throw new batcherexception(exp); } finally { connection.tryclose(); } } } private void processcommandparameters(datatable datatable, dbcommand command, datarow row, bool first) { for (var c = 0; c < datatable.columns.count; c++) { dbparameter parameter; //首次创建参数,是为了使用缓存 if (first) { parameter = servicecontext.database.provider.dbproviderfactory.createparameter(); parameter.parametername = datatable.columns[c].columnname; command.parameters.add(parameter); } else { parameter = command.parameters[c]; } parameter.value = row[c]; } } /// <summary> /// 生成插入数据的sql语句。 /// </summary> /// <param name="database"></param> /// <param name="table"></param> /// <returns></returns> private string generateinsersql(idatabase database, datatable table) { var syntax = database.provider.getservice<isyntaxprovider>(); var names = new stringbuilder(); var values = new stringbuilder(); var flag = new assertflag(); table.eachcolumn(column => { if (!flag.asserttrue()) { names.append(","); values.append(","); } names.append(dbutility.formatbyquote(syntax, column.columnname)); values.appendformat("{0}{1}", syntax.parameterprefix, column.columnname); }); return string.format("insert into {0}({1}) values ({2})", dbutility.formatbyquote(syntax, table.tablename), names, values); } }
四、mysql数据批量插入
/// <summary> /// 为 mysql.data 组件提供的用于批量操作的方法。 /// </summary> public sealed class mysqlbatcher : ibatcherprovider { /// <summary> /// 获取或设置提供者服务的上下文。 /// </summary> public servicecontext servicecontext { get; set; } /// <summary> /// 将 <see cref="datatable"/> 的数据批量插入到数据库中。 /// </summary> /// <param name="datatable">要批量插入的 <see cref="datatable"/>。</param> /// <param name="batchsize">每批次写入的数据量。</param> public void insert(datatable datatable, int batchsize = 10000) { checker.argumentnull(datatable, "datatable"); if (datatable.rows.count == 0) { return; } using (var connection = servicecontext.database.createconnection()) { try { connection.tryopen(); using (var command = servicecontext.database.provider.dbproviderfactory.createcommand()) { if (command == null) { throw new batcherexception(new argumentexception("command")); } command.connection = connection; command.commandtext = generateinsersql(servicecontext.database, command, datatable); if (command.commandtext == string.empty) { return; } command.executenonquery(); } } catch (exception exp) { throw new batcherexception(exp); } finally { connection.tryclose(); } } } /// <summary> /// 生成插入数据的sql语句。 /// </summary> /// <param name="database"></param> /// <param name="command"></param> /// <param name="table"></param> /// <returns></returns> private string generateinsersql(idatabase database, dbcommand command, datatable table) { var names = new stringbuilder(); var values = new stringbuilder(); var types = new list<dbtype>(); var count = table.columns.count; var syntax = database.provider.getservice<isyntaxprovider>(); table.eachcolumn(c => { if (names.length > 0) { names.append(","); } names.appendformat("{0}", dbutility.formatbyquote(syntax, c.columnname)); types.add(c.datatype.getdbtype()); }); var i = 0; foreach (datarow row in table.rows) { if (i > 0) { values.append(","); } values.append("("); for (var j = 0; j < count; j++) { if (j > 0) { values.append(", "); } var isstrtype = isstringtype(types[j]); var parameter = createparameter(database.provider, isstrtype, types[j], row[j], syntax.parameterprefix, i, j); if (parameter != null) { values.append(parameter.parametername); command.parameters.add(parameter); } else if (isstrtype) { values.appendformat("'{0}'", row[j]); } else { values.append(row[j]); } } values.append(")"); i++; } return string.format("insert into {0}({1}) values {2}", dbutility.formatbyquote(syntax, table.tablename), names, values); } /// <summary> /// 判断是否为字符串类别。 /// </summary> /// <param name="dbtype"></param> /// <returns></returns> private bool isstringtype(dbtype dbtype) { return dbtype == dbtype.ansistring || dbtype == dbtype.ansistringfixedlength || dbtype == dbtype.string || dbtype == dbtype.stringfixedlength; } /// <summary> /// 创建参数。 /// </summary> /// <param name="provider"></param> /// <param name="isstrtype"></param> /// <param name="dbtype"></param> /// <param name="value"></param> /// <param name="parprefix"></param> /// <param name="row"></param> /// <param name="col"></param> /// <returns></returns> private dbparameter createparameter(iprovider provider, bool isstrtype, dbtype dbtype, object value, char parprefix, int row, int col) { //如果生成全部的参数,则速度会很慢,因此,只有数据类型为字符串(包含'号)和日期型时才添加参数 if ((isstrtype && value.tostring().indexof('\'') != -1) || dbtype == dbtype.datetime) { var name = string.format("{0}p_{1}_{2}", parprefix, row, col); var parameter = provider.dbproviderfactory.createparameter(); parameter.parametername = name; parameter.direction = parameterdirection.input; parameter.dbtype = dbtype; parameter.value = value; return parameter; } return null; } }
mysql的批量插入,是将值全部写在语句的values里,例如,insert batcher(id, name) values(1, '1', 2, '2', 3, '3', ........ 10, '10')。
五、测试
接下来写一个测试用例来看一下使用批量插入的效果。
public void testbatchinsert() { console.writeline(timewatcher.watch(() => invoketest(database => { var table = new datatable("batcher"); table.columns.add("id", typeof(int)); table.columns.add("name1", typeof(string)); table.columns.add("name2", typeof(string)); table.columns.add("name3", typeof(string)); table.columns.add("name4", typeof(string)); //构造100000条数据 for (var i = 0; i < 100000; i++) { table.rows.add(i, i.tostring(), i.tostring(), i.tostring(), i.tostring()); } //获取 ibatcherprovider var batcher = database.provider.getservice<ibatcherprovider>(); if (batcher == null) { console.writeline("不支持批量插入。"); } else { batcher.insert(table); } //输出batcher表的数据量 var sql = new sqlcommand("select count(1) from batcher"); console.writeline("当前共有 {0} 条数据", database.executescalar(sql)); }))); }
以下表中列出了四种数据库生成10万条数据各耗用的时间
数据库 |
耗用时间 |
mssql | 00:00:02.9376300 |
oracle | 00:00:01.5155959 |
sqlite | 00:00:01.6275634 |
mysql | 00:00:05.4166891 |
上一篇: 浅谈JavaScript节流与防抖
下一篇: C# 中的var关键字详细介绍