Elasticsearch Java High Level REST Client(Bulk API)

Bulk API

Java High Level REST Client提供了 Bulk处理器来帮助处理批量请求。

Bulk请求

BulkRequest可使用一个请求执行多个索引、更新和/或删除操做。服务器

它须要在批量请求中添加至少一个操做:并发

BulkRequest request = new BulkRequest(); 
request.add(new IndexRequest("posts").id("1")  
        .source(XContentType.JSON,"field", "foo"));
request.add(new IndexRequest("posts").id("2")  
        .source(XContentType.JSON,"field", "bar"));
request.add(new IndexRequest("posts").id("3")  
        .source(XContentType.JSON,"field", "baz"));
  • 建立BulkRequest
  • IndexRequest添加到Bulk请求。
Bulk API只支持 JSONSMILE编码的文档,提供任何其余格式的文档都会致使错误。

不一样的操做类型能够添加到同一个BulkRequest异步

BulkRequest request = new BulkRequest();
request.add(new DeleteRequest("posts", "3")); 
request.add(new UpdateRequest("posts", "2") 
        .doc(XContentType.JSON,"other", "test"));
request.add(new IndexRequest("posts").id("4")  
        .source(XContentType.JSON,"field", "baz"));
  • BulkRequest添加DeleteRequest
  • BulkRequest添加UpdateRequest
  • 使用JSON格式添加IndexRequest

可选参数

能够选择提供如下参数:ide

request.timeout(TimeValue.timeValueMinutes(2)); 
request.timeout("2m");
  • 做为TimeValue等待bulk请求执行的超时。
  • 做为String等待bulk请求执行的超时。
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
request.setRefreshPolicy("wait_for");
  • 做为WriteRequest.RefreshPolicy实例的刷新策略。
  • 做为String的刷新策略。
request.waitForActiveShards(2); 
request.waitForActiveShards(ActiveShardCount.ALL);
  • 设置在继续执行索引/更新/删除操做以前必须活动的碎片副本的数量。
  • 做为ActiveShardCount提供的碎片副本的数量:能够是ActiveShardCount.ALLActiveShardCount.ONEActiveShardCount.DEFAULT(默认)。
request.pipeline("pipelineId");
  • 全局pipelineId用于全部子请求,除非在子请求上重写。
request.routing("routingId");
  • 全局routingId用于全部子请求,除非在子请求上重写。
BulkRequest defaulted = new BulkRequest("posts");
  • 在全部子请求上使用全局索引的bulk请求,除非在子请求上重写,这个参数是@Nullable,并只能在建立BulkRequest时设置。

同步执行

当以如下方式执行BulkRequest时,客户端等待BulkResponse返回,而后继续执行代码:post

BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);

在高级别REST客户端中解析REST响应失败、请求超时或相似的状况,其中没有来自服务器的响应的状况下,同步调用可能引起IOExceptionui

在服务器返回4xx5xx错误代码的状况下,高级别客户端尝试解析响应体错误细节,而后抛出一个通用的ElasticsearchException并将原始的ResponseException做为一个被抑制的异常添加到它。编码

异步执行

还能够以异步方式执行BulkRequest,以便客户端能够直接返回,用户须要指定如何经过将请求和侦听器传递给异步块方法来处理响应或潜在故障:debug

client.bulkAsync(request, RequestOptions.DEFAULT, listener);
  • 要执行的BulkRequest和在执行完成时使用的ActionListener

异步方法不会阻塞并当即返回,一旦执行完成,ActionListener将使用onResponse方法(若是执行成功)被调用,或者使用onFailure方法(若是执行失败)被调用,失败状况和预期的异常与同步执行状况相同。code

一个典型的bulk监听器是这样的:索引

ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
    @Override
    public void onResponse(BulkResponse bulkResponse) {
        
    }

    @Override
    public void onFailure(Exception e) {
        
    }
};
  • onResponse当执行成功完成时调用。
  • onFailure当整个BulkRequest失败时调用。

Bulk响应

返回的BulkResponse包含执行操做的信息,容许对每一个结果进行以下迭代:

for (BulkItemResponse bulkItemResponse : bulkResponse) { 
    DocWriteResponse itemResponse = bulkItemResponse.getResponse(); 

    switch (bulkItemResponse.getOpType()) {
    case INDEX:    
    case CREATE:
        IndexResponse indexResponse = (IndexResponse) itemResponse;
        break;
    case UPDATE:   
        UpdateResponse updateResponse = (UpdateResponse) itemResponse;
        break;
    case DELETE:   
        DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
    }
}
  • 遍历全部操做的结果。
  • 检索操做的响应(成功与否),能够是IndexResponseUpdateResponseDeleteResponse,它们均可以看做DocWriteResponse实例。
  • 处理索引操做的响应。
  • 处理更新操做的响应。
  • 处理删除操做的响应。

Bulk响应提供了一种方法来快速检查一个或多个操做是否失败:

if (bulkResponse.hasFailures()) { 

}
  • 若是至少有一个操做失败,此方法将返回true

在这种状况下,须要对全部的操做结果进行迭代,以检查操做是否失败,若是失败,则检索相应的失败:

for (BulkItemResponse bulkItemResponse : bulkResponse) {
    if (bulkItemResponse.isFailed()) { 
        BulkItemResponse.Failure failure =
                bulkItemResponse.getFailure(); 
    }
}
  • 指示给定操做是否失败。
  • 检索失败操做的失败。

Bulk处理器

BulkProcessor提供了一个实用程序类,容许索引/更新/删除操做在添加处处理器时透明地执行,从而简化了Bulk API的使用。

为了执行请求,BulkProcessor须要如下组件:

RestHighLevelClient

  • 此客户端用于执行BulkRequest并检索BulkResponse

BulkProcessor.Listener

  • 在每次执行BulkRequest以前和以后,或者当一个BulkRequest失败时,都会调用这个侦听器。

而后BulkProcessor.builder方法能够用来构建一个新的BulkProcessor

BulkProcessor.Listener listener = new BulkProcessor.Listener() { 
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            BulkResponse response) {
        
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            Throwable failure) {
        
    }
};

BulkProcessor bulkProcessor = BulkProcessor.builder(
        (request, bulkListener) ->
            client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
        listener).build();
  • 建立BulkProcessor.Listener
  • beforeBulk方法在每次执行BulkRequest以前调用。
  • afterBulk方法在每次执行BulkRequest以后调用。
  • failure参数的afterBulk方法在BulkRequest失败时调用。
  • 经过从BulkProcessor.builder调用build()方法建立BulkProcessorRestHighLevelClient.bulkAsync()方法将用于在后台执行BulkRequest

BulkProcessor.Builder提供了一些方法来配置BulkProcessor应该如何处理请求执行:

BulkProcessor.Builder builder = BulkProcessor.builder(
        (request, bulkListener) ->
            client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener),
        listener);
builder.setBulkActions(500); 
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); 
builder.setConcurrentRequests(0); 
builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); 
builder.setBackoffPolicy(BackoffPolicy
        .constantBackoff(TimeValue.timeValueSeconds(1L), 3));
  • 根据当前添加的操做数量设置刷新新bulk请求的时间(默认为1000,使用-1禁用它)。
  • 根据当前添加的操做大小设置刷新新bulk请求的时间(默认为5Mb,使用-1禁用)。
  • 设置容许执行的并发请求数量(默认为1,使用0只容许执行单个请求)。
  • 设置刷新间隔,若是间隔经过,则刷新任何挂起的BulkRequest(默认为未设置)。
  • 设置一个常量后退策略,该策略最初等待1秒并重试最多3次,有关更多选项,请参见BackoffPolicy.noBackoff()BackoffPolicy.constantBackoff()BackoffPolicy.exponentialBackoff()

一旦建立了BulkProcessor,就能够向它添加请求:

IndexRequest one = new IndexRequest("posts").id("1")
        .source(XContentType.JSON, "title",
                "In which order are my Elasticsearch queries executed?");
IndexRequest two = new IndexRequest("posts").id("2")
        .source(XContentType.JSON, "title",
                "Current status and upcoming changes in Elasticsearch");
IndexRequest three = new IndexRequest("posts").id("3")
        .source(XContentType.JSON, "title",
                "The Future of Federated Search in Elasticsearch");

bulkProcessor.add(one);
bulkProcessor.add(two);
bulkProcessor.add(three);

请求将由BulkProcessor执行,它负责为每一个bulk请求调用BulkProcessor.Listener

监听器提供访问BulkRequestBulkResponse的方法:

BulkProcessor.Listener listener = new BulkProcessor.Listener() {
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        int numberOfActions = request.numberOfActions(); 
        logger.debug("Executing bulk [{}] with {} requests",
                executionId, numberOfActions);
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            BulkResponse response) {
        if (response.hasFailures()) { 
            logger.warn("Bulk [{}] executed with failures", executionId);
        } else {
            logger.debug("Bulk [{}] completed in {} milliseconds",
                    executionId, response.getTook().getMillis());
        }
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request,
            Throwable failure) {
        logger.error("Failed to execute bulk", failure); 
    }
};
  • beforeBulk在执行BulkRequest的每次执行以前调用,这个方法容许知道将要在BulkRequest中执行的操做的数量。
  • afterBulk在每次执行BulkRequest以后调用,这个方法容许知道BulkResponse是否包含错误。
  • 若是BulkRequest失败,则调用带failure参数的afterBulk方法,该方法容许知道失败。

将全部请求添加到BulkProcessor以后,须要使用两种可用的关闭方法之一关闭它的实例。

awaitClose()方法能够用来等待,直到全部的请求都被处理完毕或者指定的等待时间过去:

boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
  • 若是全部bulk请求都已完成,则该方法返回true,若是在全部bulk请求完成以前的等待时间已通过去,则返回false

close()方法可用于当即关闭BulkProcessor

这两种方法都在关闭处理器以前刷新添加处处理器的请求,而且禁止向处理器添加任何新请求。

相关文章
相关标签/搜索