Elasticsearch.Net、Nest批量插入BulkAll
程序员文章站
2024-01-11 19:50:52
demo地址: "BulkAll" 批量导入 实现目标:想要使用ElasticSearch的 .Net Api客户端NEST批量导入数据,并发异步高效的批量导入 NEST提供了BulkAll 不废话,上代码 如果想要对处理导入过程进行监控可以这么替换 还可以使用C 的local function特性 ......
demo地址:bulkall
批量导入
实现目标:想要使用elasticsearch的 .net api客户端nest批量导入数据,并发异步高效的批量导入
nest提供了bulkall
不废话,上代码
const int size = 1000; var tokensource = new cancellationtokensource(); var observablebulk = elasticclient.bulkall(list, f => f .maxdegreeofparallelism(8) .backofftime(timespan.fromseconds(10)) .backoffretries(2) .size(size) .refreshoncompleted() .index(indexname) .buffertobulk((r, buffer) => r.indexmany(buffer)) , tokensource.token); var countdownevent = new countdownevent(1); exception exception = null; var bulkallobserver = new bulkallobserver(); observablebulk.subscribe(bulkallobserver); countdownevent.wait(tokensource.token);
如果想要对处理导入过程进行监控可以这么替换bulkallobserver
var bulkallobserver = new bulkallobserver( onnext: response => { writeline($"indexed {response.page * size} with {response.retries} retries"); }, onerror: ex => { writeline("bulkall error : {0}", ex); exception = ex; countdownevent.signal(); }, () => { writeline("bulkall finished"); countdownevent.signal(); });
还可以使用c#的local function特性,如下所示
void oncompleted() { writeline("bulkall finished"); countdownevent.signal(); } var bulkallobserver = new bulkallobserver( onnext: response => { writeline($"indexed {response.page * size} with {response.retries} retries"); }, onerror: ex => { writeline("bulkall error : {0}", ex); exception = ex; countdownevent.signal(); }, oncompleted);
完成demo,请点击 bulkall 查看