1.链接集群
import org.apache.http.HttpHost;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestHighLevelClient;
public class InitClient {
public static RestHighLevelClient getClient(){
RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("htkj101", 9200, "http"),
new HttpHost("htkj102", 9200, "http"),
new HttpHost("htkj103", 9200, "http")
)
);
return client;
};
}
2.CreatIndex
import com.htkj.elasticsearch.InitClient;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
public class CreateIndex {
public static void main(String ags[]){
createIndex1();
createIndex2();
createIndex3();
createIndex4();
}
/**
* 第一种建立Index方法
* */
private static void createIndex1(){
try(RestHighLevelClient client = InitClient.getClient()){
IndexRequest request = new IndexRequest("posts","doc","1");
String jsonString="{" +
"\"user\":\"kimchy\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
request.source(jsonString, XContentType.JSON);
synExecution(client,request);
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 第二种建立Index方法
* */
private static void createIndex2(){
try(RestHighLevelClient client = InitClient.getClient()){
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");
IndexRequest indexRequest = new IndexRequest("posts", "doc", "2")
.source(jsonMap);
synExecution(client,indexRequest);
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 第三种建立Index方法
* */
private static void createIndex3() {
try(RestHighLevelClient client = InitClient.getClient()){
XContentBuilder builder= XContentFactory.jsonBuilder();
builder.startObject();
{
builder.field("user", "kimchy");
builder.timeField("postDate", new Date());
builder.field("message", "trying out Elasticsearch");
}
builder.endObject();
IndexRequest request = new IndexRequest("posts", "doc", "3")
.source(builder);
synExecution(client,request);
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* 第四种建立Index方法
* */
private static void createIndex4(){
try(RestHighLevelClient client = InitClient.getClient()){
IndexRequest request=new IndexRequest("posts","doc","4")
.source("user", "kimchy",
"postDate", new Date(),
"message", "trying out Elasticsearch");
synExecution(client,request);
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private static void synExecution(RestHighLevelClient client,IndexRequest request) throws IOException {
IndexResponse indexResponse=client.index(request, RequestOptions.DEFAULT);
IndexResponse(indexResponse);
}
private static void asynExecution(RestHighLevelClient client,IndexRequest request){
ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
//当执行成功时 调用这里
System.out.println("执行成功");
}
@Override
public void onFailure(Exception e) {
//失败时调用这个
System.out.println("执行失败");
}
};
client.indexAsync(request, RequestOptions.DEFAULT, listener);
}
public static void IndexResponse(IndexResponse indexResponse){
String index = indexResponse.getIndex();
String type = indexResponse.getType();
String id = indexResponse.getId();
long version = indexResponse.getVersion();
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
//首次建立文档的状况
System.out.println("第一次被建立");
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
//已存在的文档被重写
System.out.println("更新成功");
}
System.out.println(" index: "+index+" type: "+type+" id: "+id+" version: "+version);
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
//处理成功分片数量少于总分片数量的状况 ---没懂是什么意思
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure :
shardInfo.getFailures()) {
String reason = failure.reason(); //出错的状况
System.out.println(reason);
}
}
}
}
3.Get
import com.htkj.elasticsearch.InitClient;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.rest.RestStatus;
import java.io.IOException;
import java.util.Map;
public class Get {
public static void main(String ags[]){
getRequest();
}
private static void getRequest(){
try(RestHighLevelClient client = InitClient.getClient()){
GetRequest getRequest = new GetRequest("posts", "doc", "1");
synExecution(client,getRequest);
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private static void synExecution(RestHighLevelClient client,GetRequest getRequest ) throws IOException {
GetResponse getResponse = client.get(getRequest, RequestOptions.DEFAULT);
getResponse(getResponse);
}
private static void asynExecution(RestHighLevelClient client,GetRequest getRequest){
ActionListener<GetResponse> listener = new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse getResponse) {
System.out.println("执行成功时调用");
}
@Override
public void onFailure(Exception e) {
System.out.println("执行失败时调用");
}
};
client.getAsync(getRequest, RequestOptions.DEFAULT, listener);
}
private static void getResponse(GetResponse getResponse){
String index = getResponse.getIndex();
String type = getResponse.getType();
String id = getResponse.getId();
if (getResponse.isExists()) {
long version = getResponse.getVersion();
//将文档变为String类型
String sourceAsString = getResponse.getSourceAsString();
//将文档变为map
Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
//将文档变为Byte
byte[] sourceAsBytes = getResponse.getSourceAsBytes();
System.out.println("文档--->"+sourceAsString);
} else {
System.out.println("找不到文档");
}
}
private static void failMethod(){
//若是查找一个索引,索引不存在,应当按照以下的方式进行处理
try(RestHighLevelClient client = InitClient.getClient()){
GetRequest request = new GetRequest("does_not_exist", "doc", "1");
try {
GetResponse getResponse = client.get(request, RequestOptions.DEFAULT);
} catch (ElasticsearchException e) {
if (e.status() == RestStatus.NOT_FOUND) {
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
4.Exsits
import com.htkj.elasticsearch.InitClient;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import java.io.IOException;
public class Exists {
public static void main(String ags[]){
existsRequest();
}
private static void existsRequest(){
try(RestHighLevelClient client = InitClient.getClient()){
GetRequest getRequest = new GetRequest("posts", "doc", "1");
//禁用获取_source
getRequest.fetchSourceContext(new FetchSourceContext(false));
//禁用获取的存储字段
getRequest.storedFields("_none_");
synExecution(client,getRequest);
} catch (IOException e) {
e.printStackTrace();
}
}
private static void synExecution(RestHighLevelClient client,GetRequest getRequest) throws IOException {
boolean exists = client.exists(getRequest, RequestOptions.DEFAULT);
}
private static void asynExecution(RestHighLevelClient client,GetRequest getRequest){
ActionListener<Boolean> listener = new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean exists) {
}
@Override
public void onFailure(Exception e) {
}
};
client.existsAsync(getRequest, RequestOptions.DEFAULT, listener);
}
}
5.Update
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
public class Update {
public static void main(String[] args) {
}
private static void updateRequest(){
UpdateRequest request = new UpdateRequest("posts", "doc", "1");
}
private static void update1(){
UpdateRequest request = new UpdateRequest("posts", "doc", "1");
String jsonString = "{" +
"\"updated\":\"2017-01-01\"," +
"\"reason\":\"daily update\"" +
"}";
request.doc(jsonString, XContentType.JSON);
}
private static void update2(){
Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("updated", new Date());
jsonMap.put("reason", "daily update");
UpdateRequest request = new UpdateRequest("posts", "doc", "1")
.doc(jsonMap);
}
private static void update3() throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.timeField("updated", new Date());
builder.field("reason", "daily update");
}
builder.endObject();
UpdateRequest request = new UpdateRequest("posts", "doc", "1")
.doc(builder);
}
private static void update4(){
UpdateRequest request = new UpdateRequest("posts", "doc", "1")
.doc("updated", new Date(),
"reason", "daily update");
}
/**
*若是文档尚不存在,则可使用如下upsert方法定义一些内容,这些内容将做为新文档插入
**/
private static void upsert(UpdateRequest request){
String jsonString = "{\"created\":\"2017-01-01\"}";
request.upsert(jsonString, XContentType.JSON);
}
private static void synExecution(RestHighLevelClient client,UpdateRequest request) throws IOException {
UpdateResponse updateResponse = client.update(request, RequestOptions.DEFAULT);
}
private static void asynExecution(RestHighLevelClient client,UpdateRequest request){
ActionListener<UpdateResponse> listener = new ActionListener<UpdateResponse>() {
@Override
public void onResponse(UpdateResponse updateResponse) {
}
@Override
public void onFailure(Exception e) {
}
};
client.updateAsync(request, RequestOptions.DEFAULT, listener);
}
public static void updateResponse(UpdateResponse updateResponse){
String index = updateResponse.getIndex();
String type = updateResponse.getType();
String id = updateResponse.getId();
long version = updateResponse.getVersion();
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
//处理首次建立文档的状况(upsert)
System.out.println("首次建立文档");
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
//处理文档更新的状况
System.out.println("update更新成功");
} else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
//处理文件被删除的状况
System.out.println("删除成功");
} else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
//处理文档不受更新影响的状况,即未对文档执行任何操做(空转)
System.out.println("没有执行任何操做");
}
System.out.println(" index: "+index+" type: "+type+" id: "+id+" version: "+version);
ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure :
shardInfo.getFailures()) {
String reason = failure.reason();
}
}
}
}
6.Delete
import com.htkj.elasticsearch.InitClient;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.support.replication.ReplicationResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import java.io.IOException;
public class Delete {
public static void main(String ags[]){
deleteRequest();
}
private static void deleteRequest(){
try(RestHighLevelClient client = InitClient.getClient()){
DeleteRequest request = new DeleteRequest("posts", "doc", "2");
synExecution(client,request);
client.close();
} catch (IOException e) {
e.printStackTrace();
}
}
private static void synExecution(RestHighLevelClient client, DeleteRequest request) throws IOException {
DeleteResponse deleteResponse = client.delete(request, RequestOptions.DEFAULT);
deleteResponse(deleteResponse);
}
private static void asynExecution(RestHighLevelClient client, DeleteRequest request){
ActionListener<DeleteResponse> listener = new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
}
@Override
public void onFailure(Exception e) {
}
};
client.deleteAsync(request, RequestOptions.DEFAULT, listener);
}
public static void deleteResponse(DeleteResponse deleteResponse) throws IOException {
String index = deleteResponse.getIndex();
String type = deleteResponse.getType();
String id = deleteResponse.getId();
long version = deleteResponse.getVersion();
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
System.out.println("删除的是"+" index: "+index+" type: "+type+" id: "+id+" version: "+version);
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure :
shardInfo.getFailures()) {
String reason = failure.reason();
}
}
}
private static void deleteFail(RestHighLevelClient client) throws IOException {
DeleteRequest request = new DeleteRequest("posts", "doc", "does_not_exist");
DeleteResponse deleteResponse = client.delete(
request, RequestOptions.DEFAULT);
if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
System.out.println("没有找到");
}
}
}
7.Bulk(批量操做)
import com.htkj.elasticsearch.InitClient;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.DocWriteResponse;
import org.elasticsearch.action.bulk.*;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentType;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
public class Bulk {
public static void main(String[] args) {
test();
}
private static Logger logger = LogManager.getRootLogger();
private static void bulkRequest(){
//建立BulkRequest
BulkRequest request = new BulkRequest();
//添加indexRequest
//批量操做仅支持json或SMILE编码的文档,其余格式会错误
request.add(new IndexRequest("posts", "doc", "1")
.source(XContentType.JSON,"field", "foo"));
request.add(new IndexRequest("posts", "doc", "2")
.source(XContentType.JSON,"field", "bar"));
request.add(new IndexRequest("posts", "doc", "3")
.source(XContentType.JSON,"field", "baz"));
}
private static void bulkOtherRequest(){
//能够将不一样的操做类型添加到相同的BulkRequest中
BulkRequest request = new BulkRequest();
//添加DeleteRequest
request.add(new DeleteRequest("posts", "doc", "3"));
//添加UpdateRequest
request.add(new UpdateRequest("posts", "doc", "2")
.doc(XContentType.JSON,"other", "test"));
//添加IndexRequest
request.add(new IndexRequest("posts", "doc", "4")
.source(XContentType.JSON,"field", "baz"));
}
private static void argsOptional(){
//其余的可选参数
BulkRequest request = new BulkRequest();
//第一种设置超时时间 超时时间为2min
request.timeout(TimeValue.timeValueMinutes(2));
//第二种设置超时时间 超时时间为2min
request.timeout("2m");
//第一种设置刷新的方式 将刷新的方式设置为WriteRequest.RefreshPolicy
request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
//第二种设置刷新方式
request.setRefreshPolicy("wait_for");
}
private static void synExecution(RestHighLevelClient client,BulkRequest request) throws IOException {
BulkResponse bulkResponse = client.bulk(request, RequestOptions.DEFAULT);
}
private static void asynExecution(RestHighLevelClient client,BulkRequest request){
ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkResponse) {
}
@Override
public void onFailure(Exception e) {
}
};
client.bulkAsync(request, RequestOptions.DEFAULT, listener);
}
private static void bulkResonse(BulkResponse bulkResponse) {
//遍历全部结果
for (BulkItemResponse bulkItemResponse : bulkResponse) {
//IndexResponse, UpdateResponse DeleteResponse 都会被视为DocWriteResponse的实例
DocWriteResponse itemResponse = bulkItemResponse.getResponse();
switch (bulkItemResponse.getOpType()) {
case INDEX:
case CREATE:
IndexResponse indexResponse = (IndexResponse) itemResponse;
//调用方法
CreateIndex.IndexResponse(indexResponse);
break;
case UPDATE:
UpdateResponse updateResponse = (UpdateResponse) itemResponse;
//调用方法
Update.updateResponse(updateResponse);
break;
case DELETE:
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
try {
//调用方法
Delete.deleteResponse(deleteResponse);
} catch (IOException e) {
e.printStackTrace();
}
default:
}
}
}
private static void bulkResponseFail(BulkResponse bulkResponse){
if (bulkResponse.hasFailures()) {
for (BulkItemResponse bulkItemResponse : bulkResponse) {//操做是否失败
if (bulkItemResponse.isFailed()) {
//检索失败的操做
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
}
}
}
}
/**
* BulkProcessor 经过提供一个工具类 容许index update delete操做可以被添加到processor中被执行
* 为了执行请求,BulkProcessor须要如下组件
* RestHighLevelClient 该客户端用于执行BulkRequest 和检索BulkResponse
* BulkProcessor.Listener 在每次BulkRequest执行以前或以后或BulkRequest失败时调用此监听器
* 而后BulkProcessor.builder方法可用于构建新的 BulkProcessor:
*
* */
private static void bulkProcessor(RestHighLevelClient client) throws InterruptedException {
//建立BulkProcessor.Listener
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
/**
* 此方法在每次执行bulkrequest以前调用
*
* */
@Override
public void beforeBulk(long executionId, BulkRequest request) {
//在每次执行bulkrequest以前调用,此方法容许知道将在bulkrequest中执行的操做数
int numberOfActions = request.numberOfActions();
logger.debug("Executing bulk [{}] with {} requests",
executionId, numberOfActions);
}
/**
* 此方法在每次执行bulkrequest以后调用
*
* */
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
//在每次执行以后调用BulkRequest,此方法容许知道是否BulkResponse包含错误
if (response.hasFailures()) {
logger.warn("Bulk [{}] executed with failures", executionId);
} else {
logger.debug("Bulk [{}] completed in {} milliseconds",
executionId, response.getTook().getMillis());
}
}
/**
* 当bulkrequest失败时调用此方法
*
* */
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
logger.error("Failed to execute bulk", failure);
}
};
//RestHighLevelClient.bulkAsync() 方法将用于执行BulkRequest后台操做
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
(request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
//BulkProcessor经过build()从中调用方法来建立BulkProcessor.Builder
BulkProcessor bulkProcessor = BulkProcessor.builder(bulkConsumer, listener).build();
//BulkProcessor.Builder提供的方法来配置如何 BulkProcessor应该处理请求的执行:
BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener);
//根据当前添加的操做数设置什么时候刷新新的批量请求(默认为1000,使用-1禁用它)
builder.setBulkActions(500);
//根据当前添加的操做大小设置什么时候刷新新的批量请求(默认为5Mb,使用-1禁用它)
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB));
//设置容许执行的并发请求数(默认为1,使用0仅容许执行单个请求)
builder.setConcurrentRequests(0);
//设置刷新间隔,若是间隔经过,则刷新任何挂起的bulkrequest(默认为未设置)
builder.setFlushInterval(TimeValue.timeValueSeconds(10L));
//设置一个初始等待1秒并最多重试3次的回退策略
builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));
//以上 也能够写成这样的形式
BulkProcessor bulkProcessor2 = BulkProcessor.builder(bulkConsumer, listener)
.setBulkActions(500)
.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB))
.setConcurrentRequests(0)
.setFlushInterval(TimeValue.timeValueSeconds(10L))
.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3))
.build();
//BulkProcessor 被建立后 requests 就能够添加进BulkProcessor 中
IndexRequest one = new IndexRequest("posts", "doc", "1").
source(XContentType.JSON, "title",
"In which order are my Elasticsearch queries executed?");
IndexRequest two = new IndexRequest("posts", "doc", "2")
.source(XContentType.JSON, "title",
"Current status and upcoming changes in Elasticsearch");
IndexRequest three = new IndexRequest("posts", "doc", "3")
.source(XContentType.JSON, "title",
"The Future of Federated Search in Elasticsearch");
bulkProcessor2.add(one);
bulkProcessor2.add(two);
bulkProcessor2.add(three);
//将全部请求添加到后BulkProcessor,须要关闭其实例
//这里给出两种方式 第一种是awaitClose()方法
//该awaitClose()方法可用于等待全部请求都已处理或通过指定的等待时间:
boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
//第二种是close()方法 这将当即关闭BulkProcessor
bulkProcessor.close();
}
/**
*使用bulkProcessor 批量建立Index
* */
private static void test(){
try(RestHighLevelClient client = InitClient.getClient()){
BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer =
(request, bulkListener) -> client.bulkAsync(request, RequestOptions.DEFAULT, bulkListener);
//listener
BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
int numberOfActions = request.numberOfActions();
logger.debug("Executing bulk [{}] with {} requests", executionId, numberOfActions);
}
@Override
public void afterBulk(long executionId, BulkRequest request, BulkResponse response) {
//response
bulkResonse(response);
if (response.hasFailures()) {
logger.warn("Bulk [{}] executed with failures", executionId);
} else {
logger.debug("Bulk [{}] completed in {} milliseconds",
executionId, response.getTook().getMillis());
}
}
@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
logger.error("Failed to execute bulk", failure);
}
};
//build
BulkProcessor bulkProcessor = BulkProcessor.builder(bulkConsumer, listener)
.setBulkActions(500)
.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB))
.setConcurrentRequests(0)
.setFlushInterval(TimeValue.timeValueSeconds(10L))
.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3))
.build();
//request
IndexRequest one = new IndexRequest("posts", "doc", "1").
source(XContentType.JSON, "title",
"In which order are my Elasticsearch queries executed?");
IndexRequest two = new IndexRequest("posts", "doc", "2")
.source(XContentType.JSON, "title",
"Current status and upcoming changes in Elasticsearch");
IndexRequest three = new IndexRequest("posts", "doc", "3")
.source(XContentType.JSON, "title",
"The Future of Federated Search in Elasticsearch");
DeleteRequest one2 = new DeleteRequest("posts", "doc", "3");
UpdateRequest two2 = new UpdateRequest("posts", "doc", "2")
.doc(XContentType.JSON, "other", "test");
IndexRequest three2 = new IndexRequest("posts", "doc", "4")
.source(XContentType.JSON, "field", "baz");
//add
// bulkProcessor.add(one);
// bulkProcessor.add(two);
// bulkProcessor.add(three);
bulkProcessor.add(one2);
bulkProcessor.add(two2);
bulkProcessor.add(three2);
//close
boolean terminated = bulkProcessor.awaitClose(30L, TimeUnit.SECONDS);
client.close();
} catch (Exception e) {
e.printStackTrace();
}
}
}