demo地址:BulkAllgit
实现目标:想要使用ElasticSearch的 .Net Api客户端NEST批量导入数据,并发异步高效的批量导入
NEST提供了BulkAll
不废话,上代码github
const int size = 1000; var tokenSource = new CancellationTokenSource(); var observableBulk = elasticClient.BulkAll(list, f => f .MaxDegreeOfParallelism(8) .BackOffTime(TimeSpan.FromSeconds(10)) .BackOffRetries(2) .Size(size) .RefreshOnCompleted() .Index(indexName) .BufferToBulk((r, buffer) => r.IndexMany(buffer)) , tokenSource.Token); var countdownEvent = new CountdownEvent(1); Exception exception = null; var bulkAllObserver = new BulkAllObserver(); observableBulk.Subscribe(bulkAllObserver); countdownEvent.Wait(tokenSource.Token);
若是想要对处理导入过程进行监控能够这么替换BulkAllObserver
并发
var bulkAllObserver = new BulkAllObserver( onNext: response => { WriteLine($"Indexed {response.Page * size} with {response.Retries} retries"); }, onError: ex => { WriteLine("BulkAll Error : {0}", ex); exception = ex; countdownEvent.Signal(); }, () => { WriteLine("BulkAll Finished"); countdownEvent.Signal(); });
还可使用C#的local function特性,以下所示异步
void OnCompleted() { WriteLine("BulkAll Finished"); countdownEvent.Signal(); } var bulkAllObserver = new BulkAllObserver( onNext: response => { WriteLine($"Indexed {response.Page * size} with {response.Retries} retries"); }, onError: ex => { WriteLine("BulkAll Error : {0}", ex); exception = ex; countdownEvent.Signal(); }, OnCompleted);
完成demo,请点击 BulkAll 查看code