Elasticsearch Java API 6.2(文档API)

文档API

本节描述如下CRUD API:html

单文档的API

  • Index API
  • Get API
  • Delete API
  • Update API

多文档API

  • Multi Get API
  • Bulk API
  • Reindex API
  • Update By Query API
  • Delete By Query API
注意
全部 CRUD API都是单索引 API,索引参数接受单个索引名,或指向单个索引的别名

Index API

index API容许将类型化的JSON文档索引到特定的索引中,并使其可搜索。json

生成JSON文档

生成JSON文档有几种不一样的方法:数组

  • 手动的(也就是你本身)使用原生byte[]或做为String
  • 使用一个Map,该Map将自动转换为它的JSON等效项
  • 使用第三方库对bean(如Jackson)进行序列化
  • 使用内置的助手XContentFactory.jsonBuilder()

在内部,每一个类型被转换为byte[](像String被转换为byte[]),所以,若是对象已经以这种形式存在,那么就使用它,jsonBuilder是高度优化的JSON生成器,它直接构造一个byte[]并发

本身动手

这里没有什么困难,可是请注意,您必须根据日期格式对日期进行编码。app

String json = "{" +
        "\"user\":\"kimchy\"," +
        "\"postDate\":\"2013-01-30\"," +
        "\"message\":\"trying out Elasticsearch\"" +
    "}";

使用Map

Map是一个键:值对集合,它表示一个JSON结构:less

Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate",new Date());
json.put("message","trying out Elasticsearch");

bean序列化

可使用Jacksonbean序列化为JSON,请将Jackson Databind添加到您的项目中,而后,您可使用ObjectMapper来序列化您的bean:异步

import com.fasterxml.jackson.databind.*;

// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse

// generate json
byte[] json = mapper.writeValueAsBytes(yourbeaninstance);

使用Elasticsearch助手

Elasticsearch提供了内置的助手来生成JSON内容。elasticsearch

import static org.elasticsearch.common.xcontent.XContentFactory.*;

XContentBuilder builder = jsonBuilder()
    .startObject()
        .field("user", "kimchy")
        .field("postDate", new Date())
        .field("message", "trying out Elasticsearch")
    .endObject()

注意,您还可使用startArray(String)endArray()方法添加数组,顺便说一下,field方法接受许多对象类型,您能够直接传递数字、日期甚至其余XContentBuilder对象。ide

若是须要查看生成的JSON内容,可使用string()方法。oop

String json = builder.string();

索引文档

下面的示例将JSON文档索引为一个名为twitter的索引,其类型为tweet, id值为1:

import static org.elasticsearch.common.xcontent.XContentFactory.*;

IndexResponse response = client.prepareIndex("twitter", "tweet", "1")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                    .endObject()
                  )
        .get();

注意,您还能够将文档索引为JSON字符串,而且不须要提供ID:

String json = "{" +
        "\"user\":\"kimchy\"," +
        "\"postDate\":\"2013-01-30\"," +
        "\"message\":\"trying out Elasticsearch\"" +
    "}";

IndexResponse response = client.prepareIndex("twitter", "tweet")
        .setSource(json, XContentType.JSON)
        .get();

IndexResponse对象会给你一个响应:

// Index name
String _index = response.getIndex();
// Type name
String _type = response.getType();
// Document ID (generated or not)
String _id = response.getId();
// Version (if it's the first time you index this document, you will get: 1)
long _version = response.getVersion();
// status has stored current instance statement.
RestStatus status = response.status();

有关索引操做的更多信息,请查看REST索引文档

Get API

get API容许根据索引的id从索引中获取类型化的JSON文档,下面的示例从一个名为twitter的索引中获取JSON文档,该索引的类型名为tweet, id值为1:

GetResponse response = client.prepareGet("twitter", "tweet", "1").get();

有关get操做的更多信息,请查看REST get文档。

Delete API

delete API容许基于id从特定索引中删除类型化的JSON文档,下面的示例从名为twitter的索引中删除JSON文档,该索引的类型名为tweet, id值为1:

DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();

Delete By Query API

经过查询删除的API能够根据查询结果删除给定的一组文档:

BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
    .filter(QueryBuilders.matchQuery("gender", "male")) 
    .source("persons")                                  
    .get();                                             
long deleted = response.getDeleted();
  • QueryBuilders.matchQuery("gender", "male")(查询)
  • source("persons") (索引)
  • get()(执行操做)
  • response.getDeleted()(被删除的文档数)

因为这是一个长时间运行的操做,若是您但愿异步执行,能够调用execute而不是get,并提供以下监听器:

DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
    .filter(QueryBuilders.matchQuery("gender", "male"))     
    .source("persons")                                      
    .execute(new ActionListener<BulkByScrollResponse>() {   
        @Override
        public void onResponse(BulkByScrollResponse response) {
            long deleted = response.getDeleted();           
        }
        @Override
        public void onFailure(Exception e) {
            // Handle the exception
        }
    });

Update API

您能够建立一个UpdateRequest并将其发送给客户端:

UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("index");
updateRequest.type("type");
updateRequest.id("1");
updateRequest.doc(jsonBuilder()
        .startObject()
            .field("gender", "male")
        .endObject());
client.update(updateRequest).get();

也可使用prepareUpdate()方法:

client.prepareUpdate("ttl", "doc", "1")
        .setScript(new Script("ctx._source.gender = \"male\""  , ScriptService.ScriptType.INLINE, null, null))
        .get();

client.prepareUpdate("ttl", "doc", "1")
        .setDoc(jsonBuilder()               
            .startObject()
                .field("gender", "male")
            .endObject())
        .get();
  • Script()(你的脚本,它也能够是本地存储的脚本名)
  • setDoc()(将合并到现有的文档)

注意,您不能同时提供脚本和doc

使用脚本更新

update API容许基于提供的脚本更新文档:

UpdateRequest updateRequest = new UpdateRequest("ttl", "doc", "1")
        .script(new Script("ctx._source.gender = \"male\""));
client.update(updateRequest).get();

经过合并文档更新

update API还支持传递一个部分文档合并到现有文档中(简单的递归合并,内部合并对象,取代核心的“键/值”和数组),例如:

UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
        .doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject());
client.update(updateRequest).get();

Upsert

也有对Upsert的支持,若是文档不存在,则使用upsert元素的内容索引新的doc:

IndexRequest indexRequest = new IndexRequest("index", "type", "1")
        .source(jsonBuilder()
            .startObject()
                .field("name", "Joe Smith")
                .field("gender", "male")
            .endObject());
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
        .doc(jsonBuilder()
            .startObject()
                .field("gender", "male")
            .endObject())
        .upsert(indexRequest);              
client.update(updateRequest).get();

若是文档不存在,将添加indexRequest中的文档。

若是文件index/type/1已经存在,咱们将在此操做后得到以下文件:

{
    "name"  : "Joe Dalton",
    "gender": "male"        
}
  • "gender": "male"(此字段由更新请求添加)

若是不存在,咱们将有一份新文件:

{
    "name" : "Joe Smith",
    "gender": "male"
}

Multi Get API

multi get API容许根据文档的indextypeid获取文档列表:

MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
    .add("twitter", "tweet", "1")           
    .add("twitter", "tweet", "2", "3", "4") 
    .add("another", "type", "foo")          
    .get();

for (MultiGetItemResponse itemResponse : multiGetItemResponses) { 
    GetResponse response = itemResponse.getResponse();
    if (response.isExists()) {                      
        String json = response.getSourceAsString(); 
    }
}
  • add("twitter", "tweet", "1")(经过单一id)
  • add("twitter", "tweet", "2", "3", "4")(或以相同index/type的id列表)
  • add("another", "type", "foo")(你也能够从另外一个索引中获得)
  • MultiGetItemResponse itemResponse : multiGetItemResponses(迭代结果集)
  • response.isExists()(您能够检查文档是否存在)
  • response.getSourceAsString()(访问_source字段)

有关multi get操做的更多信息,请查看剩余的multi get文档

Bulk API

bulk API容许在一个请求中索引和删除多个文档,这里有一个示例用法:

import static org.elasticsearch.common.xcontent.XContentFactory.*;

BulkRequestBuilder bulkRequest = client.prepareBulk();

// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "trying out Elasticsearch")
                    .endObject()
                  )
        );

bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
        .setSource(jsonBuilder()
                    .startObject()
                        .field("user", "kimchy")
                        .field("postDate", new Date())
                        .field("message", "another post")
                    .endObject()
                  )
        );

BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
    // process failures by iterating through each bulk response item
}

使用Bulk处理器

BulkProcessor类提供了一个简单的接口,能够根据请求的数量或大小自动刷新bulk操做,或者在给定的时间以后。

要使用它,首先建立一个BulkProcessor实例:

import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;

BulkProcessor bulkProcessor = BulkProcessor.builder(
        client,  
        new BulkProcessor.Listener() {
            @Override
            public void beforeBulk(long executionId,
                                   BulkRequest request) { ... } 

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  BulkResponse response) { ... } 

            @Override
            public void afterBulk(long executionId,
                                  BulkRequest request,
                                  Throwable failure) { ... } 
        })
        .setBulkActions(10000) 
        .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) 
        .setFlushInterval(TimeValue.timeValueSeconds(5)) 
        .setConcurrentRequests(1) 
        .setBackoffPolicy(
            BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) 
        .build();
  • beforeBulk()

    • 此方法在执行bulk以前被调用,例如,您能够经过request.numberOfActions()查看numberOfActions
  • afterBulk(...BulkResponse response)

    • 此方法在执行bulk以后被调用,例如,您能够经过response.hasFailures()检查是否存在失败请求
  • afterBulk(...Throwable failure)

    • bulk失败并引起一个可抛出对象时,将调用此方法
  • setBulkActions(10000)

    • 咱们但愿每10,000个请求就执行一次bulk
  • setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))

    • 咱们但愿每5MB就flush一次
  • setFlushInterval(TimeValue.timeValueSeconds(5))

    • 不管请求的数量是多少,咱们都但愿每5秒flush一次
  • setConcurrentRequests(1)

    • 设置并发请求的数量,值为0意味着只容许执行一个请求,在积累新的bulk请求时,容许执行一个值为1的并发请求
  • setBackoffPolicy()

    • 设置一个自定义的备份策略,该策略最初将等待100ms,以指数形式增长并重试三次,当一个或多个bulk项目请求以EsRejectedExecutionException失败时,将尝试重试,该异常代表用于处理请求的计算资源太少,要禁用backoff,请传递BackoffPolicy.noBackoff()

默认状况下,BulkProcessor:

  • bulkActions设置为1000
  • bulkSize设置为5mb
  • 不设置flushInterval
  • concurrentrequest设置为1,这意味着flush操做的异步执行
  • backoffPolicy设置为一个指数备份,8次重试,启动延时为50ms,总等待时间约为5.1秒

添加请求

而后您能够简单地将您的请求添加到BulkProcessor:

bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));
bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));

关闭Bulk Processor

当全部的文档都被加载到BulkProcessor,可使用awaitCloseclose方法进行关闭:

bulkProcessor.awaitClose(10, TimeUnit.MINUTES);

bulkProcessor.close();

若是经过设置flushInterval来调度其余计划的flush,这两种方法都将flush全部剩余的文档,并禁用全部其余计划flush。若是并发请求被启用,那么awaitClose方法等待指定的超时以完成全部bulk请求,而后返回true,若是在全部bulk请求完成以前指定的等待时间已通过去,则返回falseclose方法不等待任何剩余的批量请求完成并当即退出。

在测试中使用Bulk Processor

若是您正在使用Elasticsearch运行测试,而且正在使用BulkProcessor来填充数据集,那么您最好将并发请求的数量设置为0,以便以同步方式执行批量的flush操做:

BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })
        .setBulkActions(10000)
        .setConcurrentRequests(0)
        .build();

// Add your requests
bulkProcessor.add(/* Your requests */);

// Flush any remaining requests
bulkProcessor.flush();

// Or close the bulkProcessor if you don't need it anymore
bulkProcessor.close();

// Refresh your indices
client.admin().indices().prepareRefresh().get();

// Now you can start searching!
client.prepareSearch().get();

Update By Query API

updateByQuery最简单的用法是在不更改源的状况下更新索引中的每一个文档,这种用法容许获取一个新属性或另外一个在线映射更改。

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index").abortOnVersionConflict(false);
BulkByScrollResponse response = updateByQuery.get();

updateByQuery API的调用从获取索引快照开始,索引使用内部版本控制找到任何文档。

注意
当一个文档在快照的时间和索引请求过程之间发生变化时,会发生版本冲突。

当版本匹配时,updateByQuery更新文档并增长版本号。

全部更新和查询失败都会致使updateByQuery停止,这些故障能够从BulkByScrollResponse#getIndexingFailures方法中得到,任何成功的更新仍然存在,而且不会回滚,当第一次失败致使停止时,响应包含由失败的bulk请求生成的全部失败。

为了防止版本冲突致使updateByQuery停止,请设置abortOnVersionConflict(false),第一个示例之因此这样作,是由于它试图获取在线映射更改,而版本冲突意味着在相同时间开始updateByQuery和试图更新文档的冲突文档。这很好,由于该更新将获取在线映射更新。

UpdateByQueryRequestBuilder API支持过滤更新的文档,限制要更新的文档总数,并使用脚本更新文档:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
    .filter(QueryBuilders.termQuery("level", "awesome"))
    .size(1000)
    .script(new Script(ScriptType.INLINE, "ctx._source.awesome = 'absolutely'", "painless", Collections.emptyMap()));
BulkByScrollResponse response = updateByQuery.get();

UpdateByQueryRequestBuilder还容许直接访问用于选择文档的查询,您可使用此访问来更改默认的滚动大小,或者以其余方式修改对匹配文档的请求。

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
    .source().setSize(500);
BulkByScrollResponse response = updateByQuery.get();

您还能够将大小与排序相结合以限制文档的更新:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index").size(100)
    .source().addSort("cat", SortOrder.DESC);
BulkByScrollResponse response = updateByQuery.get();

除了更改文档的_source字段外,还可使用脚本更改操做,相似于Update API:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("source_index")
    .script(new Script(
        ScriptType.INLINE,
        "if (ctx._source.awesome == 'absolutely) {"
            + "  ctx.op='noop'"
            + "} else if (ctx._source.awesome == 'lame') {"
            + "  ctx.op='delete'"
            + "} else {"
            + "ctx._source.awesome = 'absolutely'}",
        "painless",
        Collections.emptyMap()));
BulkByScrollResponse response = updateByQuery.get();

Update API中,能够设置ctx.op的值来更改执行的操做:

  • noop

    • 若是您的脚本没有作任何更改,设置ctx.op = "noop"updateByQuery操做将从更新中省略该文档,这种行为增长了响应主体中的noop计数器。
  • delete

    • 若是您的脚本决定必须删除该文档,设置ctx.op = "delete",删除将在响应主体中已删除的计数器中报告。

ctx.op设置为任何其余值都会产生错误,在ctx中设置任何其余字段都会产生错误。

这个API不容许您移动它所接触的文档,只是修改它们的源,这是故意的!咱们没有规定要把文件从原来的位置移走。

您也能够同时对多个索引和类型执行这些操做,相似于search API:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source("foo", "bar").source().setTypes("a", "b");
BulkByScrollResponse response = updateByQuery.get();

若是提供路由值,则进程将路由值复制到滚动查询,将进程限制为与路由值匹配的碎片:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.source().setRouting("cat");
BulkByScrollResponse response = updateByQuery.get();

updateByQuery也能够经过指定这样的pipeline来使用ingest节点:

UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client);
updateByQuery.setPipeline("hurray");
BulkByScrollResponse response = updateByQuery.get();

使用Task API

您可使用Task API获取全部正在运行的update-by-query请求的状态:

ListTasksResponse tasksList = client.admin().cluster().prepareListTasks()
    .setActions(UpdateByQueryAction.NAME).setDetailed(true).get();
for (TaskInfo info: tasksList.getTasks()) {
    TaskId taskId = info.getTaskId();
    BulkByScrollTask.Status status = (BulkByScrollTask.Status) info.getStatus();
    // do stuff
}

使用上面所示的TaskId,您能够直接查找任务:

GetTaskResponse get = client.admin().cluster().prepareGetTask(taskId).get();

使用Cancel Task API

任何查询更新均可以使用Task Cancel API取消:

// Cancel all update-by-query requests
client.admin().cluster().prepareCancelTasks().setActions(UpdateByQueryAction.NAME).get().getTasks();
// Cancel a specific update-by-query request
client.admin().cluster().prepareCancelTasks().setTaskId(taskId).get().getTasks();

使用list tasks API查找taskId的值。

取消请求一般是一个很是快速的过程,但可能要花费几秒钟的时间,task status API继续列出任务,直到取消完成。

Rethrottling

在正在运行的更新中,使用_rethrottle API更改requests_per_second的值:

RethrottleAction.INSTANCE.newRequestBuilder(client)
    .setTaskId(taskId)
    .setRequestsPerSecond(2.0f)
    .get();

使用list tasks API查找taskId的值。

updateByQuery API同样,requests_per_second的值能够是任何正值的浮点值来设置节流的级别,或者Float.POSITIVE_INFINITY禁用节流。requests_per_second值加快进程当即生效,减慢查询的requests_per_second值在完成当前批处理后生效,以防止滚动超时。

Reindex API

详情见reindex API

BulkByScrollResponse response = ReindexAction.INSTANCE.newRequestBuilder(client)
    .destination("target_index")
    .filter(QueryBuilders.matchQuery("category", "xzy")) 
    .get();

还能够提供查询来筛选应该从源索引到目标索引的哪些文档。

相关文章
相关标签/搜索