承接上文,使用Java High Level REST Client操做elasticsearchhtml
高级客户端提供了批量处理器以协助批量请求java
BulkRequest能够在一次请求中执行多个索引,更新或者删除操做。一次请求至少须要一个操做。并发
//建立BulkRequest实例 BulkRequest request = new BulkRequest(); //使用IndexRequest添加三个文档,不清楚用法能够参考Index API request.add(new IndexRequest("posts", "doc", "1") .source(XContentType.JSON,"field", "foo")); request.add(new IndexRequest("posts", "doc", "2") .source(XContentType.JSON,"field", "bar")); request.add(new IndexRequest("posts", "doc", "3") .source(XContentType.JSON,"field", "baz"));
Bulk API仅支持以JSON或SMILE编码的文档。 提供任何其余格式的文档将致使错误。异步
同一个BulkRequest能够添加不一样类型的操做。elasticsearch
// 添加 DeleteRequest到BulkRequest,不清楚用法能够参考Delete API request.add(new DeleteRequest("posts", "doc", "3")); // 添加 UpdateRequest到BulkRequest,不清楚用法能够参考Update API request.add(new UpdateRequest("posts", "doc", "2") .doc(XContentType.JSON, "other", "test")); // 添加 一个使用SMILE格式的IndexRequest request.add(new IndexRequest("posts", "doc", "4") .source(XContentType.SMILE, "field", "baz"));
可选参数ide
//设置超时,等待批处理被执行的超时时间(使用TimeValue形式) request.timeout(TimeValue.timeValueMinutes(2)); //设置超时,等待批处理被执行的超时时间(字符串形式) request.timeout("2m");
//刷新策略 request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);//WriteRequest.RefreshPolicy实例方式 request.setRefreshPolicy("wait_for");//字符串方式
//设置在执行索引/更新/删除操做以前必须处于活动状态的分片副本数。 request.waitForActiveShards(2); //使用ActiveShardCount方式来提供分片副本数:能够是ActiveShardCount.ALL,ActiveShardCount.ONE或ActiveShardCount.DEFAULT(默认) request.waitForActiveShards(ActiveShardCount.ALL);
同步执行工具
BulkResponse bulkResponse = client.bulk(request);
异步执行post
批量请求的异步执行须要将BulkRequest实例和ActionListener实例传递给异步方法:ui
//当BulkRequest执行完成时,ActionListener会被调用 client.bulkAsync(request, listener);
异步方法不会阻塞并会当即返回。完成后,若是执行成功完成,则使用onResponse方法回调ActionListener,若是失败则使用onFailure方法。
BulkResponse 的典型监听器以下所示:编码
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() { @Override public void onResponse(BulkResponse bulkResponse) { //执行成功完成时调用。 response做为参数提供,并包含已执行的每一个操做的单个结果列表。 请注意,一个或多个操做可能已失败,然而其余操做已成功执行。 } @Override public void onFailure(Exception e) { //在整个BulkRequest失败时调用。 在这种状况下,exception做为参数提供,而且没有执行任何操做。 } };
返回的BulkResponse包含有关已执行操做的信息,并容许迭代每一个结果,以下所示:
//遍历全部操做结果 for (BulkItemResponse bulkItemResponse : bulkResponse) { //获取操做的响应,能够是IndexResponse,UpdateResponse或DeleteResponse, // 它们均可以被视为DocWriteResponse实例 DocWriteResponse itemResponse = bulkItemResponse.getResponse(); if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { //处理index操做 IndexResponse indexResponse = (IndexResponse) itemResponse; } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) { //处理update操做 UpdateResponse updateResponse = (UpdateResponse) itemResponse; } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { //处理delete操做 DeleteResponse deleteResponse = (DeleteResponse) itemResponse; } }
批量响应提供了用于快速检查一个或多个操做是否失败的方法:
if (bulkResponse.hasFailures()) { //该方法只要有一个操做失败都会返回true }
若是想要查看操做失败的缘由,则须要遍历全部操做结果:
for (BulkItemResponse bulkItemResponse : bulkResponse) { if (bulkItemResponse.isFailed()) {//判断当前操做是否失败 //获取失败对象,拿到了failure对象,想怎么玩都行 BulkItemResponse.Failure failure = bulkItemResponse.getFailure(); } }
BulkProcessor经过提供一个工具类来简化Bulk API的使用,容许索引/更新/删除操做在添加处处理器后透明地执行。
为了执行请求,BulkProcessor须要如下组件:
RestHighLevelClient
此客户端用于执行BulkRequest并获取BulkResponse
BulkProcessor.Listener
在每次BulkRequest执行以前和以后或BulkRequest失败时调用此监听器
而后BulkProcessor.builder方法可用于构建新的BulkProcessor:
//建立BulkProcessor.Listener BulkProcessor.Listener listener1 = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { //在每次执行BulkRequest以前调用此方法 } @Override public void afterBulk(long executionId, BulkRequest request,BulkResponse response) { //在每次执行BulkRequest以后调用此方法 } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { //执行BulkRequest失败时调用此方法 } }; //经过从BulkProcessor.Builder调用build()方法来建立BulkProcessor。 //RestHighLevelClient.bulkAsync()方法将用来执行BulkRequest。 BulkProcessor bulkProcessor = BulkProcessor.builder(client::bulkAsync, listener1).build();
BulkProcessor.Builder提供了配置BulkProcessor应如何处理请求执行的方法:
BulkProcessor.Builder builder = BulkProcessor.builder(client::bulkAsync, listener1); //设置什么时候刷新新的批量请求,根据当前已添加的操做数量(默认为1000,使用-1禁用它) builder.setBulkActions(500);//操做数为500时就刷新请求 //设置什么时候刷新新的批量请求,根据当前已添加的操做大小(默认为5Mb,使用-1禁用它) builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB));//操做大小为1M时就刷新请求 //设置容许执行的并发请求数(默认为1,使用0只容许执行单个请求) builder.setConcurrentRequests(0);//不并发执行 //设置刷新间隔时间,若是超过了间隔时间,则随便刷新一个BulkRequest挂起(默认为未设置) builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); //设置一个最初等待1秒,最多重试3次的常量退避策略。 // 有关更多选项,请参阅BackoffPolicy.noBackoff(),BackoffPolicy.constantBackoff()和BackoffPolicy.exponentialBackoff()。 builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
建立BulkProcessor后,就能够向其添加请求了:
IndexRequest one = new IndexRequest("posts", "doc", "1"). source(XContentType.JSON, "title", "In which order are my Elasticsearch queries executed?"); IndexRequest two = new IndexRequest("posts", "doc", "2") .source(XContentType.JSON, "title", "Current status and upcoming changes in Elasticsearch"); IndexRequest three = new IndexRequest("posts", "doc", "3") .source(XContentType.JSON, "title", "The Future of Federated Search in Elasticsearch"); bulkProcessor.add(one); bulkProcessor.add(two); bulkProcessor.add(three);
这些请求将由BulkProcessor执行,BulkProcessor负责为每一个批量请求调用BulkProcessor.Listener。
侦听器提供访问BulkRequest和BulkResponse的方法:
BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { //在每次执行BulkRequest以前调用,经过此方法能够获取将在BulkRequest中执行的操做数 int numberOfActions = request.numberOfActions(); logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { //在每次执行BulkRequest后调用,经过此方法能够获取BulkResponse是否包含错误 if (response.hasFailures()) { logger.warn("Bulk [{}] executed with failures", executionId); } else { logger.debug("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis()); } } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { //若是BulkRequest失败,经过调用此方法能够获取失败 logger.error("Failed to execute bulk", failure); } };
将全部请求添加到BulkProcessor后,须要使用两种可用的关闭方法之一关闭其实例。
awaitClose()方法可用于等待全部请求都已处理或过了指定的等待时间:
boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
若是全部批量请求都已完成,则该方法返回true;若是在全部批量请求完成以前等待时间已过,则返回false
close()方法可用于当即关闭BulkProcessor:
bulkProcessor.close();
两种方法在关闭处理器以前会刷新已添加处处理器的请求,而且还会禁止将任何新请求添加处处理器。
官方文档:https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-document-bulk.html#_optional_arguments_4