欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Elasticsearch.Net、Nest批量插入BulkAll

程序员文章站 2024-01-11 19:50:52
demo地址: "BulkAll" 批量导入 实现目标:想要使用ElasticSearch的 .Net Api客户端NEST批量导入数据,并发异步高效的批量导入 NEST提供了BulkAll 不废话,上代码 如果想要对处理导入过程进行监控可以这么替换 还可以使用C 的local function特性 ......

demo地址:bulkall

批量导入

实现目标:想要使用elasticsearch的 .net api客户端nest批量导入数据,并发异步高效的批量导入
nest提供了bulkall
不废话,上代码

            const int size = 1000;
            var tokensource = new cancellationtokensource();

            var observablebulk = elasticclient.bulkall(list, f => f
                    .maxdegreeofparallelism(8)
                    .backofftime(timespan.fromseconds(10))
                    .backoffretries(2)
                    .size(size)
                    .refreshoncompleted()
                    .index(indexname)
                    .buffertobulk((r, buffer) => r.indexmany(buffer))
                , tokensource.token);

            var countdownevent = new countdownevent(1);

            exception exception = null;

            var bulkallobserver = new bulkallobserver();

            observablebulk.subscribe(bulkallobserver);

            countdownevent.wait(tokensource.token);

如果想要对处理导入过程进行监控可以这么替换bulkallobserver

               var bulkallobserver = new bulkallobserver(
                onnext: response =>
                {
                    writeline($"indexed {response.page * size} with {response.retries} retries");
                },
                onerror: ex =>
                {
                    writeline("bulkall error : {0}", ex);
                    exception = ex;
                    countdownevent.signal();
                },
                () =>
                {
                    writeline("bulkall finished");
                    countdownevent.signal();
                });

还可以使用c#的local function特性,如下所示

            void oncompleted()
            {
                writeline("bulkall finished");
                countdownevent.signal();
            }

            var bulkallobserver = new bulkallobserver(
                onnext: response =>
                {
                    writeline($"indexed {response.page * size} with {response.retries} retries");
                },
                onerror: ex =>
                {
                    writeline("bulkall error : {0}", ex);
                    exception = ex;
                    countdownevent.signal();
                },
                oncompleted);

完成demo,请点击 bulkall 查看