Elasitcsearch High Level Rest Client学习笔记(三)批量api

Bulk Request并发

BulkRequest能够在一块儿从请求执行批量添加、更新和删除,至少须要添加一个操做异步

BulkRequest request = new BulkRequest(); //建立BulkRequest
request.add(new IndexRequest("posts", "doc", "1")  //添加操做
        .source(XContentType.JSON,"field", "foo"));
request.add(new IndexRequest("posts", "doc", "2")  //添加操做
        .source(XContentType.JSON,"field", "bar"));
request.add(new IndexRequest("posts", "doc", "3")  //添加操做
        .source(XContentType.JSON,"field", "baz"));

注意:每次只支持一种encoded,不然会报错ide

能够在同一个BulkRequest中添加不一样类型操做post

BulkRequest request = new BulkRequest();
request.add(new DeleteRequest("posts", "doc", "3")); 
request.add(new UpdateRequest("posts", "doc", "2") 
        .doc(XContentType.JSON,"other", "test"));
request.add(new IndexRequest("posts", "doc", "4")  
        .source(XContentType.JSON,"field", "baz"));

可选参数ui

超时时间设置spa

request.timeout(TimeValue.timeValueMinutes(2)); 
request.timeout("2m");

 

刷新策略线程

request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL); 
request.setRefreshPolicy("wait_for");

 

设置副本shard活跃验证,执行index、update、delete操做前必须有多少个副本shard活跃debug

request.waitForActiveShards(2); 
request.waitForActiveShards(ActiveShardCount.ALL);

调用方式code

同步对象

BulkResponse bulkResponse = client.bulk(request);

异步

client.bulkAsync(request, new ActionListener<BulkResponse>() {
    @Override
    public void onResponse(BulkResponse bulkResponse) {
        //成功
    }

    @Override
    public void onFailure(Exception e) {
        //失败
    }
});

响应对象

响应对象包括操做信息,而且能够便利每个结果

for (BulkItemResponse bulkItemResponse : bulkResponse) { 
    DocWriteResponse itemResponse = bulkItemResponse.getResponse(); 

    if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
            || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {     //index操做
        IndexResponse indexResponse = (IndexResponse) itemResponse;

    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {     //update操做
        UpdateResponse updateResponse = (UpdateResponse) itemResponse;

    } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {     //delete操做
        DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
    }
}

BulkResponce提供方法快速查看操做是否失败

if (bulkResponse.hasFailures()) { 
    //todo
}

BulkProcessor

RestHighLevelClient:执行BulkRequest而且返回BulkResponse

BulkProcessor.Listener:在bulk请求先后执行,而且能够处理失败状况

BulkProcessor.Listener listener = new BulkProcessor.Listener() { 
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        //bulk请求前执行
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
        //bulk请求后执行
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
        //失败后执行
    }
};

BulkProcessor bulkProcessor = BulkProcessor.builder(client::bulkAsync, listener).build();  //BulkProcessor经过 BulkProcessor.Builder build()方法构建, RestHighLevelClient.bulkAsync() 用来执行bulk请求

BulkProcessor.Builder提供方法使BulkProcessor调整请求参数

BulkProcessor.Builder builder = BulkProcessor.builder(client::bulkAsync, listener);
builder.setBulkActions(500); //按照数量批量处理(默认1000,-1禁用) 
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); //按照大小批量处理
builder.setConcurrentRequests(0); //并发处理线程
builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); //设置flush索引周期
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3)); //回退策略,等待1秒并重试3次, BackoffPolicy.noBackoff()  BackoffPolicy.constantBackoff()  BackoffPolicy.exponentialBackoff()  查看更多选项

添加请求

IndexRequest one = new IndexRequest("posts", "doc", "1").
        source(XContentType.JSON, "title", "In which order are my Elasticsearch queries executed?");
IndexRequest two = new IndexRequest("posts", "doc", "2")
        .source(XContentType.JSON, "title", "Current status and upcoming changes in Elasticsearch");
IndexRequest three = new IndexRequest("posts", "doc", "3")
        .source(XContentType.JSON, "title", "The Future of Federated Search in Elasticsearch");

bulkProcessor.add(one);
bulkProcessor.add(two);
bulkProcessor.add(three);

BulkProcessor经过 BulkProcessor.Listener 监控请求, BulkProcessor.Listener 提供方法接受BulkRequest和BulkResponse

BulkProcessor.Listener listener = new BulkProcessor.Listener() {
    @Override
    public void beforeBulk(long executionId, BulkRequest request) {
        int numberOfActions = request.numberOfActions(); //在每一个execution前执行,能够获知每次执行多少个操做
        logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions);
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
        if (response.hasFailures()) {  //在每一个execution后执行,能够获知是否包含错误
            logger.warn("Bulk [{}] executed with failures", executionId);
        } else {
            logger.debug("Bulk [{}] completed in {} milliseconds", executionId, response.getTook().getMillis());
        }
    }

    @Override
    public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
        logger.error("Failed to execute bulk", failure); //发生错误时执行
    }
};

批量请求执行后须要关闭BulkProcessor。两种关闭方式选其一

awaitClose(),全部请求被处理后或者等待时间结束后关闭,返回ture代表全部请求已经完成,false说明等待时间结束后请求并未执行结束

boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);

close(),当即关闭BulkProcessor

bulkProcessor.close();

关闭processor以前,全部已经被添加的请求会被提交执行,而且不能再向其中添加请求

相关文章
相关标签/搜索