C#使用Parallel处理数据同步写入Datatable并使用BulkInsert批量导入数据库
程序员文章站
2024-01-04 10:24:16
项目需要,几十万张照片需要计算出每个照片的特征值(调用C++编写的DLL)。 业务流程:选择照片文件夹,分别访问照片-->调用DLL接口传递照片路径-->接收处理返回值-->写入数据库。 前期使用的for循环来处理,几十万张照片处理起来差不多10个小时。速度太慢,后面改进使用Parallel来进行平 ......
项目需要,几十万张照片需要计算出每个照片的特征值(调用c++编写的dll)。
业务流程:选择照片文件夹,分别访问照片-->调用dll接口传递照片路径-->接收处理返回值-->写入数据库。
前期使用的for循环来处理,几十万张照片处理起来差不多10个小时。速度太慢,后面改进使用parallel来进行平行计算(调用dll处理照片),统一写入datatable,然后使用bulkinsert批量把datatable写入数据库,目前测试8万张照片并行计算速度30分钟,速度提高约30%-40%左右。
代码示例如下:
private static sqlconnection sqlconn;
private static concurrentdictionary<string, int> currints = new concurrentdictionary<string, int>();
private void button1_click(object sender, eventargs e)
{
var dirpath = "";
using (var folderbrowser = new folderbrowserdialog())
{
if (folderbrowser.showdialog() != dialogresult.ok) return;
dirpath = folderbrowser.selectedpath;
if (!directory.exists(dirpath))
{
messagebox.show(@"所选路径不存在或无权访问", @"错误", messageboxbuttons.ok, messageboxicon.error);
return;
}
}
begininvoke(new action(async () =>
{
button1.enabled = false;
var sw = new stopwatch();
sw.start();
//检测服务器链接
log.writeline(@"尝试连接数据库服务器");
sqlconn = new sqlconnection(
$"data source={txt_serverip.text},{txt_serverport.text};user id={txt_user.text};password={txt_pwd.text};initial catalog={txt_db.text};persist security info=false;pooling=true;min pool size=30;max pool size=200;");
if (sqlconn.state == connectionstate.closed)
{
try
{
sqlconn.open();
}
catch (exception exception)
{
log.writeline($@"连接数据库服务器【失败】-->{exception.message}");
button1.enabled = true;
return;
}
}
log.writeline($@"连接数据库服务器【成功】{environment.newline}获取未转换图片数据。。。");
var ds = new dataset();
int.tryparse(txt_start.text, out var start);
int.tryparse(txt_end.text, out var end);
var sqlstrall = "";
if (start == 0 || end == 0)
{
sqlstrall = "select * from viewweizhuanhuan";
}
else
{
sqlstrall = $"select * from viewweizhuanhuan where {txt_mastkey.text} between {start} and {end}";
}
var sqlcmd = new sqlcommand(sqlstrall, sqlconn);
dataadapter da = new sqldataadapter(sqlcmd);
da.fill(ds);
if (ds.tables.count == 0 || ds.tables[0].rows.count == 0)
{
log.writeline("所有图片都已经转换完毕。");
sqlconn.close();
return;
}
log.writeline($"{ds.tables[0].rows.count}个图片需要转换。");
var total = ds.tables[0].rows.count;
var rowkey = combobox1.selectedvalue.tostring();
var splitkey = txt_split.text.trim();
#region 定义数据保存
var dt = new datatable();
dt.columns.add("zd1", typeof(int));
dt.columns.add("zd2", typeof(int));
dt.columns.add("zd3", typeof(string));
dt.columns.add("zd4", typeof(string));
dt.columns.add("zd5", typeof(string));
dt.columns.add("zd6", typeof(string));
#endregion
#region 并行执行
currints.tryadd("currints", 1);//初始化进度数字为1
await task.run(() =>
{
//使用8个cpu核心来运行
var result = parallel.for(0, ds.tables[0].rows.count, new paralleloptions { maxdegreeofparallelism = 8}, (rotindex, state) =>
{
begininvoke(new action(() =>
{
currints.trygetvalue("currints", out var currvalue);
lb_process.text = $@"{currvalue}/{total}";//显示进度
var nextvalue = currvalue + 1;
currints.tryupdate("currints", nextvalue, currvalue);//加1
}));
var filedirpath = "";//根据选择的文件名格式,用填写的规则生成路径
var file = new list<string>{
$"{dirpath}\\{filedirpath}\\{ksno}_fp1.jpg",
$"{dirpath}\\{filedirpath}\\{ksno}_fp2.jpg",
$"{dirpath}\\{filedirpath}\\{ksno}_fp3.jpg"};
foreach (var zwzp in file)
{
try
{
var model = zwhelper.zwzhasync($"{zwzp}").result;//调用c++转换
if (model != null)
{
//并行计算下写入datatable需要锁定才可以,否则会提示datatable索引损坏
lock (dt.rows.syncroot)
{
var dr = dt.newrow();
dr["zd1"] = convert.toint32(filexe);
dr["zd2"] = convert.toint32(ds.tables[0].rows[rotindex]["zd1"]);
dr["zd3"] = model.zhtz;
dr["zd4"] = datetime.now.tostring("yyyy-mm-dd hh:mm:ss");
dr["zd5"] = "";
dr["zd6"] = "";
dt.rows.add(dr);
}
}
else
{
log.writeline($@"{ksno}转换失败");
log.log.error($"{ksno}转换失败。");
}
}
catch (exception exception)
{
log.log.error($"学号{ksno},图片路径{zwzp}转换失败。{exception}");
}
}
});
sw.stop();
log.writeline($"转换耗时:{sw.elapsedmilliseconds}毫秒");
log.writeline($@"开始写入数据库,数量{dt.rows.count}");
#region 批量写入
if (dt.rows.count ==0)
{
log.writeline(@"没有要写入的数据。");
return;
}
sw.restart();
var sucess = false;
if (sqlhelper.bulkinsert(sqlconn, txt_tablename.text.trim(), dt, out var err))
{
sucess = true;
}
else
{
log.log.error($"写入数据库失败==》{err}");
}
sw.stop();
log.writeline($"写入数据库是否成功=>{sucess},耗时{sw.elapsedmilliseconds}毫秒");
#endregion
});
#endregion
if (sqlconn.state == connectionstate.open)
{
sqlconn.close();
}
button1.enabled = true;
}));
}
sql批量写入函数
/// <summary>
/// 批量插入
/// </summary>
/// <param name="conn">连接对象</param>
/// <param name="tablename">将泛型集合插入到本地数据库表的表名</param>
/// <param name="datatable">要批量写入的datatable</param>
/// <param name="err">错误时返回的信息</param>
public static bool bulkinsert(sqlconnection conn, string tablename, datatable datatable, out string err)
{
err = "";
if (datatable == null || datatable.rows.count == 0)
{
err = "要写入的数据为空";
return false;
}
var tran = conn.begintransaction();//开启事务
var bulkcopy = new sqlbulkcopy(conn, sqlbulkcopyoptions.keepnulls, tran);
try
{
if (conn.state == connectionstate.closed)
{
conn.open();
}
bulkcopy.batchsize = 1000;
bulkcopy.destinationtablename = tablename;
bulkcopy.writetoserver(datatable);
tran.commit();
return true;
}
catch (exception e)
{
err = e.tostring();
tran.rollback();
return false;
}
finally
{
bulkcopy.close();
if (conn.state == connectionstate.open)
{
conn.close();
}
}
}