欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

asp.net实现Postgresql快速写入/读取大量数据实例

程序员文章站 2022-06-03 15:01:29
最近因为一些项目需要大量插入数据,研究了下asp.net实现postgresql快速写入/读取大量数据,所以留个笔记 环境及测试 使用.net驱动npgsql连接pos...

最近因为一些项目需要大量插入数据,研究了下asp.net实现postgresql快速写入/读取大量数据,所以留个笔记

环境及测试

使用.net驱动npgsql连接post数据库。配置:win10 x64, i5-4590, 16g ddr3, ssd 850evo.

postgresql 9.6.3,数据库与数据都安装在ssd上,默认配置,无扩展。

create table public.mesh
(
 x integer not null,
 y integer not null,
 z integer,
 constraint prim primary key (x, y)
)

1. 导入

使用数据备份,csv格式导入,文件位于机械硬盘上,480mb,数据量2500w+。

使用copy

copy mesh from 'd:/user.csv' csv

运行时间107s

使用insert

单连接,c# release any cpu 非调试模式。

class program
{
  static void main(string[] args)
  {
    var list = getdata("d:\\user.csv");
    timecalc.logstarttime();
    using (var sm = new sqlmanipulation(@"strings", sqltype.postgresql))
    {
      sm.init();
      foreach (var n in list)
      {
        sm.excutenonquery($"insert into mesh(x,y,z) values({n.x},{n.y},{n.z})");
      }
    }
    timecalc.showtotalduration();

    console.readkey();
  }

  static list<(int x, int y, int z)> getdata(string filepath)
  {
    list<valuetuple<int, int, int>> list = new list<(int, int, int)>();
    foreach (var n in file.readlines(filepath))
    {
      string[] x = n.split(',');
      list.add((convert.toint32(x[0]), convert.toint32(x[1]), convert.toint32(x[2])));
    }
    return list;
  }
}

postgresql cpu占用率很低,但是跑了一年,程序依然不能结束,没有耐性了...,这么插入不行。

multiline insert

使用multiline插入,一条语句插入约100条数据。

var bag = getdata("d:\\user.csv");
//使用时,直接执行stringbuilder的tostring方法。
list<stringbuilder> listbuilder = new list<stringbuilder>();
stringbuilder sb = new stringbuilder();
for (int i = 0; i < bag.count; i++)
{
  if (i % 100 == 0)
  {
    sb = new stringbuilder();
    listbuilder.add(sb);
    sb.append("insert into mesh(x,y,z) values");
    sb.append($"({bag[i].x}, {bag[i].y}, {bag[i].z})");
  }
  else
    sb.append($",({bag[i].x}, {bag[i].y}, {bag[i].z})");
}

postgresql cpu占用率差不多27%,磁盘写入大约45mb/s,感觉就是在干活,最后时间217.36s。

改为1000一行的话,cpu占用率提高,但是磁盘写入平均来看有所降低,最后时间160.58s.

prepare语法

prepare语法可以让postgresql提前规划sql,优化性能。

使用单行插入 cpu占用率不到25%,磁盘写入63mb/s左右,但是,使用单行插入的方式,效率没有改观,时间太长还是等不来结果。

使用多行插入 cpu占用率30%,磁盘写入50mb/s,最后结果163.02,最后的时候出了个异常,就是最后一组数据长度不满足条件,无伤大雅。

static void main(string[] args)
{
  var bag = getdata("d:\\user.csv");
  list<stringbuilder> listbuilder = new list<stringbuilder>();
  stringbuilder sb = new stringbuilder();
  for (int i = 0; i < bag.count; i++)
  {
    if (i % 1000 == 0)
    {
      sb = new stringbuilder();
      listbuilder.add(sb);
      //sb.append("insert into mesh(x,y,z) values");
      sb.append($"{bag[i].x}, {bag[i].y}, {bag[i].z}");
    }
    else
      sb.append($",{bag[i].x}, {bag[i].y}, {bag[i].z}");
  }
  stringbuilder sbp = new stringbuilder();
  sbp.append("prepare insertplan (");
  for (int i = 0; i < 1000; i++)
  {
    sbp.append("int,int,int,");
  }
  sbp.remove(sbp.length - 1, 1);
  sbp.append(") as insert into mesh(x, y, z) values");
  for (int i = 0; i < 1000; i++)
  {
    sbp.append($"(${i*3 + 1},${i* 3 + 2},${i*3+ 3}),");
  }
  sbp.remove(sbp.length - 1, 1);
  timecalc.logstarttime();

  using (var sm = new sqlmanipulation(@"string", sqltype.postgresql))
  {
    sm.init();
    sm.excutenonquery(sbp.tostring());
    foreach (var n in listbuilder)
    {
      sm.excutenonquery($"execute insertplan({n.tostring()})");
    }
  }
  timecalc.showtotalduration();

  console.readkey();
}

使用transaction

在前面的基础上,使用事务改造。每条语句插入1000条数据,每1000条作为一个事务,cpu 30%,磁盘34mb/s,耗时170.16s。

改成100条一个事务,耗时167.78s。

使用多线程

还在前面的基础上,使用多线程,每个线程建立一个连接,一个连接处理100条sql语句,每条sql语句插入1000条数据,以此种方式进行导入。注意,连接字符串可以将maxpoolsize设置大一些,我机器上实测,不设置会报连接超时错误。

cpu占用率上到80%, 磁盘这里需要注意,由于生成了非常多个postgresql server进程,不好统计,累积算上应该有小100mb/s,最终时间,98.18s。

使用tpl,由于parallel.foreach返回的结果没有检查,可能导致时间不是很准确(偏小)。

var lists = new list<list<string>>();
var listt = new list<string>();
for (int i = 0; i < listbuilder.count; i++)
{
  if (i % 1000 == 0)
  {
    listt = new list<string>();
    lists.add(listt);
  }
  listt.add(listbuilder[i].tostring());
}
timecalc.logstarttime();
parallel.foreach(lists, (x) =>
{
  using (var sm = new sqlmanipulation(@";string;maxpoolsize=1000;", sqltype.postgresql))
  {
    sm.init();
    foreach (var n in x)
    {
      sm.excutenonquery(n);
    }
  }
});
timecalc.showtotalduration();

写入方式 耗时(1000条/行)
copy 107s
insert n/a
多行insert 160.58s
prepare多行insert 163.02s
事务多行insert 170.16s
多连接多行insert 98.18s

2. 写入更新

数据实时更新,数量可能继续增长,使用简单的insert或者update是不行的,操作使用postgresql 9.5以后支持的新语法。

insert into mesh on conflict (x,y) do update set z = excluded.z

吐槽postgresql这么晚才支持on conflict,mysql早有了...

在表中既有数据2500w+的前提下,重复往数据库里面写这些数据。这里只做多行插入更新测试,其他的结果应该差不多。

普通多行插入,耗时272.15s。
 多线程插入的情况,耗时362.26s,cpu占用率一度到了100%。猜测多连接的情况下,更新互锁导致性能下降。

3. 读取

select方法

标准读取还是用select方法,ado.net直接读取。

使用adapter方式,耗时135.39s;使用dbreader方式,耗时71.62s。

copy方法

postgresql的copy方法提供stdout binary方式,可以指定一条查询进行输出,耗时53.20s。
public list<(int x, int y, int z)> bulkiquerynpg()
{
  list<(int, int, int)> dict = new list<(int, int, int)>();
  using (var reader = ((npgsqlconnection)_conn).beginbinaryexport("copy (select x,y,z from mesh) to stdout (format binary)"))
  {
    while (reader.startrow() != -1)
    {
      var x = reader.read<int>(npgsqldbtype.integer);
      var y = reader.read<int>(npgsqldbtype.integer);
      var z = reader.read<int>(npgsqldbtype.integer);
      dict.add((x, y, z));
    }
  }
  return dict;
}

结论

总结测试结果,对于较多数据的情况下,可以得出以下结论:

  1. 向空数据表导入或者没有重复数据表的导入,优先使用copy语句(为什么有这个前提详见p.s.);
  2. 使用一条语句插入多条数据的方式能够大幅度改善插入性能,可以实验确定最优条数;
  3. 使用transaction或者prepare插入,在本场景中优化效果不明显;
  4. 使用多连接/多线程操作,速度上有优势,但是把握不好容易造成资源占用率过高,连接数太大也容易影响其他应用;
  5. 写入更新是postgresql新特性,使用会造成一定的性能消耗(相对直接插入);
  6. 读取数据时,使用copy语句能够获得较好的性能;
  7. ado.net dbreader对象由于不需要fill的过程,读取速度也较快(虽然赶不上copy),也可优先考虑。

p.s.

为什么不用mysql

没有最好的,只有最合适的,讲道理我也是挺喜欢用mysql的。使用postgresql的原因主要在于:

postgresql导入导出的sql指令“copy”直接支持binary模式到stdin和stdout,如果程序想直接集成,那么用这个是比较方便的;相比较,mysql的sql语法(load data infile)并不支持到stdin或者stdout,导出可以通过mysqldump.exe实现,导入暂时没什么特别好的办法(mysqlimport或许可以)。
相较于mysql缺点

postgresql使用copy导入的时候,如果目标表已经有数据,那么在有主键约束的表遇到错误时,copy自动终止,而且可能导致不完全插入的情况,换言之,是不支持导入的过程进行update操作;mysql的load语法可以显式指定出错之后的动作(ignore/replace),不会打断导入过程。

其他

如果需要使用mysql从程序导入数据,可以考虑先通过程序导出到文件,然后借助文件进行导入,据说效率也要比insert高出不少。

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。