Java High Level REST Client提供了 Bulk处理器来帮助处理批量请求。
BulkRequest
可使用一个请求执行多个索引、更新和/或删除操做。服务器
它须要在批量请求中添加至少一个操做:并发
BulkRequest request = new BulkRequest(); request.add(new IndexRequest("posts").id("1") .source(XContentType.JSON,"field", "foo")); request.add(new IndexRequest("posts").id("2") .source(XContentType.JSON,"field", "bar")); request.add(new IndexRequest("posts").id("3") .source(XContentType.JSON,"field", "baz"));
BulkRequest
。IndexRequest
添加到Bulk
请求。Bulk API只支持 JSON或 SMILE编码的文档,提供任何其余格式的文档都会致使错误。
不一样的操做类型能够添加到同一个BulkRequest
:异步
BulkRequest request = new BulkRequest(); request.add(new DeleteRequest("posts", "3")); request.add(new UpdateRequest("posts", "2") .doc(XContentType.JSON,"other", "test")); request.add(new IndexRequest("posts").id("4") .source(XContentType.JSON,"field", "baz"));
BulkRequest
添加DeleteRequest
。BulkRequest
添加UpdateRequest
。IndexRequest
。能够选择提供如下参数:ide
request.timeout(TimeValue.timeValueMinutes(2)); request.timeout("2m");
TimeValue
等待bulk请求执行的超时。String
等待bulk请求执行的超时。request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); request.setRefreshPolicy("wait_for");
WriteRequest.RefreshPolicy
实例的刷新策略。String
的刷新策略。request.waitForActiveShards(2); request.waitForActiveShards(ActiveShardCount.ALL);
ActiveShardCount
提供的碎片副本的数量:能够是ActiveShardCount.ALL
、ActiveShardCount.ONE
、ActiveShardCount.DEFAULT
(默认)。request.pipeline("pipelineId");
pipelineId
用于全部子请求,除非在子请求上重写。request.routing("routingId");
routingId
用于全部子请求,除非在子请求上重写。BulkRequest defaulted = new BulkRequest("posts");
@Nullable
,并只能在建立BulkRequest
时设置。当以如下方式执行BulkRequest
时,客户端等待BulkResponse
返回,而后继续执行代码:post
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
在高级别REST客户端中解析REST响应失败、请求超时或相似的状况,其中没有来自服务器的响应的状况下,同步调用可能引起IOException
。ui
在服务器返回4xx
或5xx
错误代码的状况下,高级别客户端尝试解析响应体错误细节,而后抛出一个通用的ElasticsearchException
并将原始的ResponseException
做为一个被抑制的异常添加到它。编码
还能够以异步方式执行BulkRequest
,以便客户端能够直接返回,用户须要指定如何经过将请求和侦听器传递给异步块方法来处理响应或潜在故障:debug
client.bulkAsync(request, RequestOptions.DEFAULT, listener);
BulkRequest
和在执行完成时使用的ActionListener
。异步方法不会阻塞并当即返回,一旦执行完成,ActionListener
将使用onResponse
方法(若是执行成功)被调用,或者使用onFailure
方法(若是执行失败)被调用,失败状况和预期的异常与同步执行状况相同。code
一个典型的bulk监听器是这样的:索引
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() { @Override public void onResponse(BulkResponse bulkResponse) { } @Override public void onFailure(Exception e) { } };
onResponse
当执行成功完成时调用。onFailure
当整个BulkRequest
失败时调用。返回的BulkResponse
包含执行操做的信息,容许对每一个结果进行以下迭代:
for (BulkItemResponse bulkItemResponse : bulkResponse) { DocWriteResponse itemResponse = bulkItemResponse.getResponse(); switch (bulkItemResponse.getOpType()) { case INDEX: case CREATE: IndexResponse indexResponse = (IndexResponse) itemResponse; break; case UPDATE: UpdateResponse updateResponse = (UpdateResponse) itemResponse; break; case DELETE: DeleteResponse deleteResponse = (DeleteResponse) itemResponse; } }
IndexResponse
、UpdateResponse
或DeleteResponse
,它们均可以看做DocWriteResponse
实例。Bulk响应提供了一种方法来快速检查一个或多个操做是否失败:
if (bulkResponse.hasFailures()) { }
true
。在这种状况下,须要对全部的操做结果进行迭代,以检查操做是否失败,若是失败,则检索相应的失败:
for (BulkItemResponse bulkItemResponse : bulkResponse) { if (bulkItemResponse.isFailed()) { BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); } }
BulkProcessor
提供了一个实用程序类,容许索引/更新/删除操做在添加处处理器时透明地执行,从而简化了Bulk API的使用。
为了执行请求,BulkProcessor
须要如下组件:
RestHighLevelClient
BulkRequest
并检索BulkResponse
。BulkProcessor.Listener
BulkRequest
以前和以后,或者当一个BulkRequest
失败时,都会调用这个侦听器。而后BulkProcessor.builder
方法能够用来构建一个新的BulkProcessor
:
BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { } }; BulkProcessor bulkProcessor = BulkProcessor.builder( (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener).build();
BulkProcessor.Listener
。beforeBulk
方法在每次执行BulkRequest
以前调用。afterBulk
方法在每次执行BulkRequest
以后调用。failure
参数的afterBulk
方法在BulkRequest
失败时调用。BulkProcessor.builder
调用build()
方法建立BulkProcessor
,RestHighLevelClient.bulkAsync()
方法将用于在后台执行BulkRequest
。BulkProcessor.Builder
提供了一些方法来配置BulkProcessor
应该如何处理请求执行:
BulkProcessor.Builder builder = BulkProcessor.builder( (request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener), listener); builder.setBulkActions(500); builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); builder.setConcurrentRequests(0); builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); builder.setBackoffPolicy(BackoffPolicy .constantBackoff(TimeValue.timeValueSeconds(1L), 3));
1000
,使用-1
禁用它)。5Mb
,使用-1
禁用)。1
,使用0
只容许执行单个请求)。BulkRequest
(默认为未设置)。BackoffPolicy.noBackoff()
、BackoffPolicy.constantBackoff()
和BackoffPolicy.exponentialBackoff()
。一旦建立了BulkProcessor
,就能够向它添加请求:
IndexRequest one = new IndexRequest("posts").id("1") .source(XContentType.JSON, "title", "In which order are my Elasticsearch queries executed?"); IndexRequest two = new IndexRequest("posts").id("2") .source(XContentType.JSON, "title", "Current status and upcoming changes in Elasticsearch"); IndexRequest three = new IndexRequest("posts").id("3") .source(XContentType.JSON, "title", "The Future of Federated Search in Elasticsearch"); bulkProcessor.add(one); bulkProcessor.add(two); bulkProcessor.add(three);
请求将由BulkProcessor
执行,它负责为每一个bulk请求调用BulkProcessor.Listener
。
监听器提供访问BulkRequest
和BulkResponse
的方法:
BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { int numberOfActions = request.numberOfActions(); logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { if (response.hasFailures()) { logger.warn("Bulk [{}] executed with failures", executionId); } else { logger.debug("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis()); } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { logger.error("Failed to execute bulk", failure); } };
beforeBulk
在执行BulkRequest
的每次执行以前调用,这个方法容许知道将要在BulkRequest
中执行的操做的数量。afterBulk
在每次执行BulkRequest
以后调用,这个方法容许知道BulkResponse
是否包含错误。BulkRequest
失败,则调用带failure
参数的afterBulk
方法,该方法容许知道失败。将全部请求添加到BulkProcessor
以后,须要使用两种可用的关闭方法之一关闭它的实例。
awaitClose()
方法能够用来等待,直到全部的请求都被处理完毕或者指定的等待时间过去:
boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
true
,若是在全部bulk请求完成以前的等待时间已通过去,则返回false
。close()
方法可用于当即关闭BulkProcessor
:
这两种方法都在关闭处理器以前刷新添加处处理器的请求,而且禁止向处理器添加任何新请求。