欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

C#如何读取Txt大数据并更新到数据库详解

程序员文章站 2022-06-21 20:34:37
环境 sqlserver 2016  .net 4.5.2  目前测试数据1300万 大约3-4分钟.(限制一次读取条数 和 线程...

环境

  • sqlserver 2016
  •  .net 4.5.2 

目前测试数据1300万 大约3-4分钟.(限制一次读取条数 和 线程数是 要节省服务器资源,如果调太大服务器其它应用可能就跑不了了), sqlserverdbhelper为数据库帮助类.没有什么特别的处理. 配置连接串时记录把连接池开起来

另外.以下代码中每次写都创建了连接 .之前试过一个连接反复用. 130次大约有20多次 数据库会出问题.并且需要的时间是7-8分钟 左右.

配置文件: xxx.json

[ {
 /*连接字符串 */
 "connstr": "",
 "filepath": "读取的文件地址",
 /*数据库表名称 */
 "tablename": "写入的数据库表名",
 /*导入前执行的语句 */
 "execbeforesql": "",
 /*导入后执行的语句 */
 "execaftersql": "",
 /*映射关系 */
 "mapping": [
 {
 "dbname": "xxx",
 "txtname": "ddd"
 } 
 ],
 /*过滤数据的正则 当前只实现了小数据一次性读完的检查*/
 "filterregex": [],
 /*检查数据合法性(从数据库获取字段属性进行验证) */
 "checkdata": false,
 /*列分隔符*/
 "separator": "\t",
 /*表头的行数*/
 "headerrowsnum": 1
 }
]

读取代码 : 注意 configurationmanager.appsettings["frpage"] 和 configurationmanager.appsettings["fr"] 需要自己配置好

//读取配置文件信息
 list<dynamic> dt = jsonconvert.deserializeobject<list<dynamic>>(file.readalltext(path.combine(appdomain.currentdomain.basedirectory, "config\\importtxt.json")));
 logutil.info("开始读取txt数据,读取配置:" + dt.count + "条");
 if (dt.count == 0)
 {
 return;
 }


 list<task> li = new list<task>();
 foreach (dynamic row in dt)
 {
 logutil.info("开始处理数据:" + jsonconvert.serializeobject(row));
 li.add(processrow(row));

 }
 task.waitall(li.toarray());
 logutil.info("数据读取完毕");
public async task processrow(dynamic row)
 {
 await task.run(() =>
 {
  autoresetevent ae = new autoresetevent(false);
  datatable data = null;
  string error = "", connstr, tablename, execbeforesql, execaftersql;
  boolean ischeck = convert.toboolean(row["checkdata"]);
  tablename = convert.tostring(row.tablename);
  connstr = convert.tostring(row.connstr);
  execbeforesql = convert.tostring(row.execbeforesql);
  execaftersql = convert.tostring(row.execaftersql);
  int headerrowsnum = convert.toint32(row.headerrowsnum);
  string separator = convert.tostring(row.separator);

  dictionary<string, string> dic = new dictionary<string, string>();

  //文件达到多大时就分行读取
  int fr = 0;
  if (!int.tryparse(configurationmanager.appsettings["fr"], out fr))
  {
  fr = 100;
  }
  fr = fr * 1024 * 1024;

  //分行读取一次读取多少
  int page = 0;
  if (!int.tryparse(configurationmanager.appsettings["frpage"], out page))
  {
  page = 50000;
  }

  foreach (var dyn in row.mapping)
  {
  dic.add(convert.tostring(dyn.txtname), convert.tostring(dyn.dbname));
  }


  list<string> regex = new list<string>();
  foreach (string item in row["filterregex"])
  {
  regex.add(item);
  }
  string fpath = "", cpath = "";




  cpath = convert.tostring(row["filepath"]);
  string rootpath = path.combine(appdomain.currentdomain.basedirectory, "tmp");
  if (!directory.exists(rootpath))
  {
  directory.createdirectory(rootpath);
  }

  fpath = path.combine(rootpath, path.getfilename(cpath));
  file.copy(cpath, fpath, true);
  logutil.info("拷文件到本地已经完成.从本地读取数据操作");
  int threadcount = environment.processorcount * 3;

  fileinfo fi = new fileinfo(fpath);
  //如果文件大于100m就需要分批读取.一次50万条
  if (fi.length > fr)
  {

  long sumcount = 0;
  streamreader sr = new streamreader(fi.openread());  
  int headrow = 0;
  string rowstr = "";

  list<thread> li_th = new list<thread>();
  bool last = false;
  int ij = 0;
  logutil.info("生成streamreader成功 ");
  #region 逐行读取
  
  
  while (sr.peek() > -1)
  {
  rowstr = sr.readline();
  #region 将行数据写入datatable
  if (headrow < headerrowsnum)
  {
  data = new datatable();
  foreach (string scol in rowstr.split(new string[] { separator }, stringsplitoptions.removeemptyentries))
  {
   data.columns.add(scol.trim(), typeof(string));
  }
  headrow++;
  continue;
  }
  else
  { //行数据
  if (headrow > 1)
  {
   for (int i = 1; i < headrow && sr.peek() > -1; i++)
   {
   rowstr += " " + sr.readline();
   }
  }
  data.rows.add(rowstr.split(new string[] { separator }, stringsplitoptions.removeemptyentries));
  if (data.rows.count < page && sr.peek() > -1)
  {
   continue;
  }
  }
  last = (sr.peek() == -1);
  #endregion

  sumcount += data.rows.count;

  processpath(data, page, sr, ref ij, tablename, execbeforesql, execaftersql, dic, ischeck, li_th);
   

  #region 检查线程等待
  if ((ij > 0 && (ij % threadcount) == 0) || last)
  {
  logutil.info("完成一批次当前共写数据: " + sumcount);
  while (true)
  {
   bool isok = true;
   foreach (var item in li_th)
   {
   if (item.isalive)
   {
   isok = false;
   application.doevents();
   thread.sleep(1000);
   }
   }
   if (isok)
   {
   li_th.clear();
   break;
   }
  }

  //最后一页要等所有的执行完才能执行
  if (sr.peek() == -1)
  {
   writetodb(tablename, data, execbeforesql, execaftersql, dic, false, true);
   logutil.info("最后一次写入完成");
  }
  logutil.info(" 线程退出开始新的循环...");
  }
  data.clear();
  #endregion
  }
  sr.dispose();
  #endregion
  }
  else
  {
  using (sqlserverdbhelper sdb = new sqlserverdbhelper())
  {
  sdb.openconnection();
  #region 一次性读取处理
  data = loaddatatablefromtxt(fpath, ref error, separator, headerrowsnum, regex, ischeck, dic, tablename);
  if (ischeck)
  {
  datarow[] rows = data.select("errormsg is not null");
  if (rows.length > 0)
  {
   logutil.info($"读取{tablename} 数据出错 : {jsonconvert.serializeobject(rows)}");
   return;
  }
  }

  logutil.info($"读取{tablename} 的txt数据完成.共读取数据:{data.rows.count}条");
  if (data.rows.count == 0 || !string.isnullorwhitespace(error))
  {
  if (!string.isnullorwhitespace(error))
  {
   logutil.info("读取数据出错,地址:" + convert.tostring(row["filepath"]) + " \r\n 错误:" + error);
  }
  return;
  }
  sdb.bgeintransaction();
  try
  {
  writetodb(tablename, data, execbeforesql, execaftersql, dic, sdb: sdb);
  sdb.committransaction();
  logutil.info(tablename + "数据更新完毕 !!");
  }
  catch (exception ex)
  {

  logutil.info(tablename + " 更新数据出错,错误:" + ex.message + " \r\n 堆栈:" + ex.stacktrace);
  sdb.rollbacktransaction();
  }
  #endregion

  }



  }

  gc.collect();
 });

 }

 private void processpath(datatable data, int page, streamreader sr, ref int ij, string tablename, string execbeforesql, string execaftersql, dictionary<string, string> dic, bool ischeck, list<thread> li_th)
 {
 int threadcount = environment.processorcount * 4;

 string error = "";
 poolmodel p = new poolmodel { tablename = tablename, execbeforesql = execbeforesql, execaftersql = execaftersql, dic = dic };
 p.data = data.copy();
 if (ischeck)
 {
 using (sqlserverdbhelper sdb = new sqlserverdbhelper())
 {
  error = checkdata(data, tablename, dic, sdb);
 }
 datarow[] rows = data.select("errormsg is not null");
 if (rows.length > 0 || !string.isnullorwhitespace(error))
 {
  logutil.info($"读取{tablename} 数据出错 : {jsonconvert.serializeobject(rows)}\r\n错误: " + error);
  return;
 }
 }

 ij++;
 if (ij == 1)
 {

 writetodb(p.tablename, p.data, p.execbeforesql, p.execaftersql, p.dic, true, false);
 logutil.info("首次写入完成");
 }

 else if (sr.peek() > -1)
 {

 thread t = new thread(d =>
 {

  poolmodel c = d as poolmodel;
  try
  {
  writetodb(c.tablename, c.data, c.execbeforesql, c.execaftersql, c.dic, false, false);  
  }
  catch (threadabortexception)
  {
  logutil.error("线程退出.................");
  }
  catch (exception ex)
  {

  logutil.error(c.tablename + "写入数据失败:" + ex.message + "\r\n堆栈:" + ex.stacktrace + "\r\n 数据: " + jsonconvert.serializeobject(c.data));
  exitapp();
  return;
  }

 });
 t.isbackground = true;
 t.start(p);
 li_th.add(t);
 }

 }

 public void exitapp()
 {
 application.exit();
 }

 public void writetodb(string tablename, datatable data, string execbeforesql, string execaftersql, dictionary<string, string> dic, bool first = true, bool last = true, sqlserverdbhelper sdb = null)
 {
 bool have = false;
 if (sdb == null)
 {
 sdb = new sqlserverdbhelper();
 have = true;
 }

 if (first && !string.isnullorwhitespace(execbeforesql))
 {
 logutil.info(tablename + "执行前sql :" + execbeforesql);
 sdb.executenonquery(execbeforesql);
 }
 sdb.bulkcopy(data, tablename, dic);
 if (last && !string.isnullorwhitespace(execaftersql))
 {
 logutil.info(tablename + "执行后sql :" + execaftersql);
 sdb.executenonquery(execaftersql);
 }
 logutil.info(tablename + "本次执行完成 ");
 if (have)
 {
 sdb.dispose();
 }
 }


 public string checkdata(datatable dt, string dbtablename, dictionary<string, string> dic, sqlserverdbhelper sdb)
 {
 if (string.isnullorwhitespace(dbtablename))
 {
 return "表名不能为空!";
 }
 if (dic.count == 0)
 {
 return "映射关系数据不存在!";

 }

 list<string> errormsg = new list<string>();
 list<string> cols = new list<string>();
 dic.foreach(c =>
 {
 if (!dt.columns.contains(c.key))
 {
  errormsg.add(c.key);
 }
 cols.add(c.key);
 });

 if (errormsg.count > 0)
 {
 return "数据列不完整,请与映射表的数据列数量保持一致!列:" + string.join(",", errormsg);
 }


 //如果行数据有错误信息则添加到这一列的值里
 dt.columns.add(new datacolumn("errormsg", typeof(string)) { defaultvalue = "" });
 string sql = @"--获取sqlserver中表结构
 select syscolumns.name as colname,systypes.name as dbtype,syscolumns.isnullable,
 syscolumns.length
 from syscolumns, systypes
 where syscolumns.xusertype = systypes.xusertype
 and syscolumns.id = object_id(@tb) ; ";
 dataset ds = sdb.getdataset(sql, new sqlparameter[] { new sqlparameter("@tb", dbtablename) });
 enumerablerowcollection<datarow> tabledef = ds.tables[0].asenumerable();

 // string colname="";
 object obj_val;

 //将表结构数据重组成字典.
 var dic_def = tabledef.todictionary(c => convert.tostring(c["colname"]), d =>
 {
 string dbtype = "";
 string old = convert.tostring(d["dbtype"]).toupper();
 dbtype = getcsharptype(old);
 return new { colname = convert.tostring(d["colname"]), dbtype = dbtype, sqltype = old, isnullble = convert.toboolean(d["isnullable"]), length = convert.toint32(d["length"]) };
 });

 datetime now = datetime.now;
 foreach (datarow row in dt.rows)
 {
 errormsg.clear();
 foreach (string colname in cols)
 {
  if (dic.containskey(colname))
  {
  if (!dic_def.containskey(dic[colname]))
  {
  return "excel列名:" + colname + " 映射数据表字段:" + dic[colname] + "在当前数据表中不存在!";
  }
  //去掉数据两边的空格
  row[colname] = obj_val = convert.tostring(row[colname]).trim();
  var info = dic_def[dic[colname]];
  //是否是dbnull
  if (obj_val.equals(dbnull.value))
  {
  if (!info.isnullble)
  {
  errormsg.add("列" + colname + "不能为空!");

  }
  }
  else
  {
  if (info.dbtype == "string")
  {
  //time类型不用验证长度(日期的 时间部分如 17:12:30.0000)
  if (info.sqltype == "time")
  {
   if (!datetime.tryparse(now.tostring("yyyy-mm-dd") + " " + obj_val.tostring(), out now))
   {
   errormsg.add("列" + colname + "填写的数据无效应为日期的时间部分如:17:30:12");

   }
  }
  else if (convert.tostring(obj_val).length > info.length)
  {
   errormsg.add("列" + colname + "长度超过配置长度:" + info.length);
  }
  }
  else
  {
  type t = type.gettype("system." + info.dbtype);
  try
  { //如果数字中有千分位在这一步可以处理掉重新给这个列赋上正确的数值  
   row[colname] = convert.changetype(obj_val, t); ;
  }
  catch (exception ex)
  {
   errormsg.add("列" + colname + "填写的数据" + obj_val + "无效应为" + info.sqltype + "类型.");
  }

  }

  }
  }

 }
 row["errormsg"] = string.join(" || ", errormsg);
 }

 return "";
 }

 /// <summary>
 /// wm 2018年11月28日13:37
 /// 将数据库常用类型转为c# 中的类名(.net的类型名)
 /// </summary>
 /// <param name="old"></param>
 /// <returns></returns>
 private string getcsharptype(string old)
 {
 string dbtype = "";
 switch (old)
 {
 case "int":
 case "bigint":
 case "smallint":
  dbtype = "int32";
  break;
 case "decimal":
 case "float":
 case "numeric":
  dbtype = "decimal";
  break;
 case "bit":
  dbtype = "boolean";
  break;
 case "text":
 case "char":
 case "nchar":
 case "varchar":
 case "nvarchar":
 case "time":
  dbtype = "string";
  break;
 case "date":
 case "datetime":
  dbtype = "datetime";
  break;
 default:
  throw new exception("getcsharptype数据类型" + dbtype + "无法识别!");

 }

 return dbtype;
 }




 public class poolmodel
 {
 public string tablename { get; set; }
 public datatable data { get; set; }
 public string execbeforesql { get; set; }
 public string execaftersql { get; set; }
 public dictionary<string, string> dic { get; set; }

 }
/// <summary>
 /// wm 2018年11月28日13:32
 /// 获取txt数据并对数据进行校验返回一个带有errormsg列的datatable,如果数据校验失败则该字段存放失败的原因
 /// 注意:在使用该方法前需要数据表应该已经存在
 /// </summary>
 /// <param name="ischeck">是否校验数据合法性(数据需要校验则会按传入的dbtablename获取数据库表的结构出来验证)</param>
 /// <param name="map">如果需要验证数据则此处需要传映射关系 key excel列名,value 数据库列名</param>
 /// <param name="dbtablename">验证数据合法性的表(即数据会插入到的表)</param>
 /// <param name="error">非数据验证上的异常返回</param>
 /// <param name="regexs">用来过滤数据的正则</param>
 /// <param name="path">读取文件的路径</param>
 /// <param name="separator">列分隔符</param>
 /// <param name="headerrowsnum">表头的行数</param>
 /// <returns>如果需求验证则返回一个带有errormsg列的datatable,如果数据校验失败则该字段存放失败的原因, 不需要验证则数据读取后直接返回datatable</returns>
 public datatable loaddatatablefromtxt(string path, ref string error, string separator, int headerrowsnum, list<string> regexs = null, bool ischeck = false, dictionary<string, string> map = null, string dbtablename = "", sqlserverdbhelper sdb = null)
 {
 datatable dt = new datatable();
 error = "";
 if (ischeck && (map == null || map.count == 0 || string.isnullorwhitespace(dbtablename)))
 {
 error = "参数标明需要对表格数据进行校验,但没有指定映射表集合或数据表名.";
 return dt;
 }
 string txts = file.readalltext(path);
 #region 把读出来的方便数据转成datatable

 regexs?.foreach(c =>
 {
 txts = new regex(c).replace(txts, "");
 });
 ////替换掉多表的正则
 //regex mu_re = new regex(@"\+[-+]{4,}\s+\+[-+\s|\w./]{4,}\+"); //ftp new regex(@"\+[-+]{4,}\s+\+[-+\s|\w./]{4,}\+"); //原来以-分隔的 new regex(@"-{5,}(\s)+-{5,}\s+\|.+(\s)?\|.+(\s)?\|-{5,}");
 ////去掉所有横线
 //regex mu_r = new regex(@"[+-]{4,}"); //ftp new regex(@"[+-]{4,}"); //原 new regex(@"(\|-{5,})|(-{5,})"); 
 //string s1 = mu_re.replace(txts, "");
 //string s2 = mu_r.replace(s1, "");
 // string[] tts = s2.split(new string[] { "\r\n" }, stringsplitoptions.none);
 string[] tts = txts.split(new string[] { "\r\n" }, stringsplitoptions.none);
 string[] vals;
 string s1;
 //生成表头默认第一行时表头直到遇到第一个只有一个|的内容为止(有几行表头,下面的内容就会有几行)
 int headernum = -1;//记录表头有几列

 datarow dr;
 //处理col重复的问题,如果有重复按第几个来命名 比如 a1 a2 
 dictionary<string, int> col_rep = new dictionary<string, int>();
 string colname = "";
 bool isre = false;//记录当前是否有重复列
 int empty_headerrow = 0;
 for (int i = 0; i < tts.length; i++)
 {
 s1 = tts[i];

 //还未获取出表头
 if (headernum < headerrowsnum)
 {
  vals = s1.split(new string[] { separator }, stringsplitoptions.removeemptyentries);
  foreach (string col in vals)
  {
  colname = col.trim();

  if (col_rep.keys.contains(colname))
  {
  col_rep[colname]++;
  isre = true;
  //重复列处理
  //colname += col_rep[colname];
  continue;
  }
  else
  {
  col_rep.add(colname, 1);
  }
  dt.columns.add(colname, typeof(string));
  }
  headernum = (i == (headerrowsnum - 1)) ? headerrowsnum : 0;
 }
 else
 {
  if (string.isnullorwhitespace(s1.trim()) || string.isnullorwhitespace(s1.replace(separator, "")))
  {
  continue;
  }
  if (isre)
  {
  error = "列:" + string.join(",", col_rep.where(c => c.value > 1).select(c => c.key)) + "存在重复";
  return dt;
  }


  //多行时把多行的数据加在一起处理
  if (headernum > 1)
  {
  for (int j = 1; j < headernum && (i + j) < tts.length; j++)
  {
  //数据第一行最后没有| 如果没数据则直接换行了所以这里补一个空格防止数据被当空数据移除了
  s1 += " " + tts[i + j];
  }
  }
  vals = s1.split(new string[] { separator }, stringsplitoptions.removeemptyentries);
  dr = dt.newrow();
  dr.itemarray = vals;
  dt.rows.add(dr);
  //因为本次循环结束上面会去++ 所以这里只加headernum-1次
  i += (headernum - 1);
 }

 }
 #endregion

 if (ischeck)
 {
 //dt.columns.remove("item");
 //dt.columns["item1"].columnname = "item";
 //dt.columns.removeat(dt.columns.count - 2);
 error = checkdata(dt, dbtablename, map, sdb);
 }

 return dt;

 }

总结

以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,谢谢大家对的支持。