欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

C#使用Parallel处理数据同步写入Datatable并使用BulkInsert批量导入数据库

程序员文章站 2022-03-20 13:37:34
项目需要,几十万张照片需要计算出每个照片的特征值(调用C++编写的DLL)。 业务流程:选择照片文件夹,分别访问照片-->调用DLL接口传递照片路径-->接收处理返回值-->写入数据库。 前期使用的for循环来处理,几十万张照片处理起来差不多10个小时。速度太慢,后面改进使用Parallel来进行平 ......

项目需要,几十万张照片需要计算出每个照片的特征值(调用c++编写的dll)。

业务流程:选择照片文件夹,分别访问照片-->调用dll接口传递照片路径-->接收处理返回值-->写入数据库。

前期使用的for循环来处理,几十万张照片处理起来差不多10个小时。速度太慢,后面改进使用parallel来进行平行计算(调用dll处理照片),统一写入datatable,然后使用bulkinsert批量把datatable写入数据库,目前测试8万张照片并行计算速度30分钟,速度提高约30%-40%左右。

代码示例如下:

private static sqlconnection sqlconn;
private static concurrentdictionary<string, int> currints = new concurrentdictionary<string, int>();
private void button1_click(object sender, eventargs e)
        {           
            var dirpath = "";
            using (var folderbrowser = new folderbrowserdialog())
            {
                if (folderbrowser.showdialog() != dialogresult.ok) return;
                dirpath = folderbrowser.selectedpath;
                if (!directory.exists(dirpath))
                {
                    messagebox.show(@"所选路径不存在或无权访问", @"错误", messageboxbuttons.ok, messageboxicon.error);
                    return;
                }
            }
 
            begininvoke(new action(async () =>
            {
                button1.enabled = false;
                var sw = new stopwatch();
                sw.start();
 
                //检测服务器链接
                log.writeline(@"尝试连接数据库服务器");
 
                sqlconn = new sqlconnection(
                    $"data source={txt_serverip.text},{txt_serverport.text};user id={txt_user.text};password={txt_pwd.text};initial catalog={txt_db.text};persist security info=false;pooling=true;min pool size=30;max pool size=200;");
                if (sqlconn.state == connectionstate.closed)
                {
                    try
                    {
                        sqlconn.open();
                    }
                    catch (exception exception)
                    {
                        log.writeline($@"连接数据库服务器【失败】-->{exception.message}");
                        button1.enabled = true;
                        return;
                    }
                }
 
                log.writeline($@"连接数据库服务器【成功】{environment.newline}获取未转换图片数据。。。");
                var ds = new dataset();
                int.tryparse(txt_start.text, out var start);
                int.tryparse(txt_end.text, out var end);
                var sqlstrall = "";
                if (start == 0 || end == 0)
                {
                    sqlstrall = "select * from viewweizhuanhuan";
                }
                else
                {
                    sqlstrall = $"select * from viewweizhuanhuan where {txt_mastkey.text} between {start} and {end}";
                }
 
                var sqlcmd = new sqlcommand(sqlstrall, sqlconn);
                dataadapter da = new sqldataadapter(sqlcmd);
                da.fill(ds);
                if (ds.tables.count == 0 || ds.tables[0].rows.count == 0)
                {
                    log.writeline("所有图片都已经转换完毕。");
                    sqlconn.close();
                    return;
                }
 
                log.writeline($"{ds.tables[0].rows.count}个图片需要转换。");
 
                var total = ds.tables[0].rows.count;
                var rowkey = combobox1.selectedvalue.tostring();
                var splitkey = txt_split.text.trim();
 
                #region 定义数据保存
                var dt = new datatable();
                dt.columns.add("zd1", typeof(int));
                dt.columns.add("zd2", typeof(int));
                dt.columns.add("zd3", typeof(string));
                dt.columns.add("zd4", typeof(string));
                dt.columns.add("zd5", typeof(string));
                dt.columns.add("zd6", typeof(string));
                #endregion
 
                #region 并行执行
                currints.tryadd("currints", 1);//初始化进度数字为1
                await task.run(() =>
               {
                  //使用8个cpu核心来运行
                   var result = parallel.for(0, ds.tables[0].rows.count, new paralleloptions { maxdegreeofparallelism = 8}, (rotindex, state) =>
                   {
                       begininvoke(new action(() =>
                       {
                           currints.trygetvalue("currints", out var currvalue);
                           lb_process.text = $@"{currvalue}/{total}";//显示进度
                           var nextvalue = currvalue + 1;
                           currints.tryupdate("currints", nextvalue, currvalue);//加1
                       }));                      
                      
                       var filedirpath = "";//根据选择的文件名格式,用填写的规则生成路径                      
 
                       var file = new list<string>{
                            $"{dirpath}\\{filedirpath}\\{ksno}_fp1.jpg",
                            $"{dirpath}\\{filedirpath}\\{ksno}_fp2.jpg",
                            $"{dirpath}\\{filedirpath}\\{ksno}_fp3.jpg"};
 
                       foreach (var zwzp in file)
                       {
                           try
                           {
                               var model = zwhelper.zwzhasync($"{zwzp}").result;//调用c++转换
                               if (model != null)
                               {
//并行计算下写入datatable需要锁定才可以,否则会提示datatable索引损坏                            
                                   lock (dt.rows.syncroot)
                                   {
                                       var dr = dt.newrow();
                                       dr["zd1"] = convert.toint32(filexe);
                                       dr["zd2"] = convert.toint32(ds.tables[0].rows[rotindex]["zd1"]);
                                       dr["zd3"] = model.zhtz;
                                       dr["zd4"] = datetime.now.tostring("yyyy-mm-dd hh:mm:ss");
                                       dr["zd5"] = "";
                                       dr["zd6"] = "";
                                       dt.rows.add(dr);
                                   }
                                  
                               }
                               else
                               {
                                   log.writeline($@"{ksno}转换失败");
                                   log.log.error($"{ksno}转换失败。");
                               }
                           }
                           catch (exception exception)
                           {
                               log.log.error($"学号{ksno},图片路径{zwzp}转换失败。{exception}");
                           }
                       }
 
                   });
                   sw.stop();
                   log.writeline($"转换耗时:{sw.elapsedmilliseconds}毫秒");
                   log.writeline($@"开始写入数据库,数量{dt.rows.count}");
 
                   #region 批量写入
 
                   if (dt.rows.count ==0)
                   {
                       log.writeline(@"没有要写入的数据。");
                       return;
                   }
                   sw.restart();
                   var sucess = false;
                   if (sqlhelper.bulkinsert(sqlconn, txt_tablename.text.trim(), dt, out var err))
                   {
                       sucess = true;
                   }
                   else
                   {
                       log.log.error($"写入数据库失败==》{err}");
                   }
                   sw.stop();
                   log.writeline($"写入数据库是否成功=>{sucess},耗时{sw.elapsedmilliseconds}毫秒");
                   #endregion
               });
                #endregion
              
              
                if (sqlconn.state == connectionstate.open)
                {
                    sqlconn.close();
                }
                button1.enabled = true;
            }));
        }

  sql批量写入函数

        /// <summary>
        /// 批量插入
        /// </summary>
        /// <param name="conn">连接对象</param>
        /// <param name="tablename">将泛型集合插入到本地数据库表的表名</param>
        /// <param name="datatable">要批量写入的datatable</param>
        /// <param name="err">错误时返回的信息</param>
        public static bool bulkinsert(sqlconnection conn, string tablename, datatable datatable, out string err)
        {
            err = "";
            if (datatable == null || datatable.rows.count == 0)
            {
                err = "要写入的数据为空";
                return false;
            }
            var tran = conn.begintransaction();//开启事务 
            var bulkcopy = new sqlbulkcopy(conn, sqlbulkcopyoptions.keepnulls, tran);
            try
            {
                if (conn.state == connectionstate.closed)
                {
                    conn.open();
                }
                bulkcopy.batchsize = 1000;
                bulkcopy.destinationtablename = tablename;
                bulkcopy.writetoserver(datatable);
                tran.commit();
                return true;
            }
            catch (exception e)
            {
                err = e.tostring();
                tran.rollback();
                return false;
            }
            finally
            {
                bulkcopy.close();
                if (conn.state == connectionstate.open)
                {
                    conn.close();
                }
            }
        }