本节描述如下CRUD API:html
注意
全部 CRUD API都是单索引 API,索引参数接受单个索引名,或指向单个索引的别名
index API容许将类型化的JSON文档索引到特定的索引中,并使其可搜索。json
生成JSON文档有几种不一样的方法:数组
byte[]
或做为String
Map
,该Map
将自动转换为它的JSON等效项XContentFactory.jsonBuilder()
在内部,每一个类型被转换为byte[]
(像String
被转换为byte[]
),所以,若是对象已经以这种形式存在,那么就使用它,jsonBuilder是高度优化的JSON生成器,它直接构造一个byte[]
。并发
这里没有什么困难,可是请注意,您必须根据日期格式对日期进行编码。app
String json = "{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elasticsearch\"" + "}";
Map是一个键:值对集合,它表示一个JSON结构:less
Map<String, Object> json = new HashMap<String, Object>(); json.put("user","kimchy"); json.put("postDate",new Date()); json.put("message","trying out Elasticsearch");
可使用Jackson将bean序列化为JSON,请将Jackson Databind添加到您的项目中,而后,您可使用ObjectMapper
来序列化您的bean:异步
import com.fasterxml.jackson.databind.*; // instance a json mapper ObjectMapper mapper = new ObjectMapper(); // create once, reuse // generate json byte[] json = mapper.writeValueAsBytes(yourbeaninstance);
Elasticsearch提供了内置的助手来生成JSON内容。elasticsearch
import static org.elasticsearch.common.xcontent.XContentFactory.*; XContentBuilder builder = jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject()
注意,您还可使用startArray(String)
和endArray()
方法添加数组,顺便说一下,field
方法接受许多对象类型,您能够直接传递数字、日期甚至其余XContentBuilder
对象。ide
若是须要查看生成的JSON内容,可使用string()
方法。oop
String json = builder.string();
下面的示例将JSON文档索引为一个名为twitter的索引,其类型为tweet, id值为1:
import static org.elasticsearch.common.xcontent.XContentFactory.*; IndexResponse response = client.prepareIndex("twitter", "tweet", "1") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject() ) .get();
注意,您还能够将文档索引为JSON字符串,而且不须要提供ID:
String json = "{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elasticsearch\"" + "}"; IndexResponse response = client.prepareIndex("twitter", "tweet") .setSource(json, XContentType.JSON) .get();
IndexResponse
对象会给你一个响应:
// Index name String _index = response.getIndex(); // Type name String _type = response.getType(); // Document ID (generated or not) String _id = response.getId(); // Version (if it's the first time you index this document, you will get: 1) long _version = response.getVersion(); // status has stored current instance statement. RestStatus status = response.status();
有关索引操做的更多信息,请查看REST索引文档
get API容许根据索引的id从索引中获取类型化的JSON文档,下面的示例从一个名为twitter的索引中获取JSON文档,该索引的类型名为tweet, id值为1:
GetResponse response = client.prepareGet("twitter", "tweet", "1").get();
有关get操做的更多信息,请查看REST get文档。
delete API容许基于id从特定索引中删除类型化的JSON文档,下面的示例从名为twitter的索引中删除JSON文档,该索引的类型名为tweet, id值为1:
DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();
经过查询删除的API能够根据查询结果删除给定的一组文档:
BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client) .filter(QueryBuilders.matchQuery("gender", "male")) .source("persons") .get(); long deleted = response.getDeleted();
QueryBuilders.matchQuery("gender", "male")
(查询)source("persons")
(索引)get()
(执行操做)response.getDeleted()
(被删除的文档数)因为这是一个长时间运行的操做,若是您但愿异步执行,能够调用execute
而不是get
,并提供以下监听器:
DeleteByQueryAction.INSTANCE.newRequestBuilder(client) .filter(QueryBuilders.matchQuery("gender", "male")) .source("persons") .execute(new ActionListener<BulkByScrollResponse>() { @Override public void onResponse(BulkByScrollResponse response) { long deleted = response.getDeleted(); } @Override public void onFailure(Exception e) { // Handle the exception } });
您能够建立一个UpdateRequest
并将其发送给客户端:
UpdateRequest updateRequest = new UpdateRequest(); updateRequest.index("index"); updateRequest.type("type"); updateRequest.id("1"); updateRequest.doc(jsonBuilder() .startObject() .field("gender", "male") .endObject()); client.update(updateRequest).get();
也可使用prepareUpdate()
方法:
client.prepareUpdate("ttl", "doc", "1") .setScript(new Script("ctx._source.gender = \"male\"" , ScriptService.ScriptType.INLINE, null, null)) .get(); client.prepareUpdate("ttl", "doc", "1") .setDoc(jsonBuilder() .startObject() .field("gender", "male") .endObject()) .get();
Script()
(你的脚本,它也能够是本地存储的脚本名)setDoc()
(将合并到现有的文档)注意,您不能同时提供脚本和doc
update API容许基于提供的脚本更新文档:
UpdateRequest updateRequest = new UpdateRequest("ttl", "doc", "1") .script(new Script("ctx._source.gender = \"male\"")); client.update(updateRequest).get();
update API还支持传递一个部分文档合并到现有文档中(简单的递归合并,内部合并对象,取代核心的“键/值”和数组),例如:
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1") .doc(jsonBuilder() .startObject() .field("gender", "male") .endObject()); client.update(updateRequest).get();
也有对Upsert的支持,若是文档不存在,则使用upsert元素的内容索引新的doc:
IndexRequest indexRequest = new IndexRequest("index", "type", "1") .source(jsonBuilder() .startObject() .field("name", "Joe Smith") .field("gender", "male") .endObject()); UpdateRequest updateRequest = new UpdateRequest("index", "type", "1") .doc(jsonBuilder() .startObject() .field("gender", "male") .endObject()) .upsert(indexRequest); client.update(updateRequest).get();
若是文档不存在,将添加indexRequest中的文档。
若是文件index/type/1已经存在,咱们将在此操做后得到以下文件:
{ "name" : "Joe Dalton", "gender": "male" }
"gender": "male"
(此字段由更新请求添加)若是不存在,咱们将有一份新文件:
{ "name" : "Joe Smith", "gender": "male" }
multi get API容许根据文档的index、type和id获取文档列表:
MultiGetResponse multiGetItemResponses = client.prepareMultiGet() .add("twitter", "tweet", "1") .add("twitter", "tweet", "2", "3", "4") .add("another", "type", "foo") .get(); for (MultiGetItemResponse itemResponse : multiGetItemResponses) { GetResponse response = itemResponse.getResponse(); if (response.isExists()) { String json = response.getSourceAsString(); } }
add("twitter", "tweet", "1")
(经过单一id)add("twitter", "tweet", "2", "3", "4")
(或以相同index/type的id列表)add("another", "type", "foo")
(你也能够从另外一个索引中获得)MultiGetItemResponse itemResponse : multiGetItemResponses
(迭代结果集)response.isExists()
(您能够检查文档是否存在)response.getSourceAsString()
(访问_source字段)有关multi get操做的更多信息,请查看剩余的multi get文档
bulk API容许在一个请求中索引和删除多个文档,这里有一个示例用法:
import static org.elasticsearch.common.xcontent.XContentFactory.*; BulkRequestBuilder bulkRequest = client.prepareBulk(); // either use client#prepare, or use Requests# to directly build index/delete requests bulkRequest.add(client.prepareIndex("twitter", "tweet", "1") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "trying out Elasticsearch") .endObject() ) ); bulkRequest.add(client.prepareIndex("twitter", "tweet", "2") .setSource(jsonBuilder() .startObject() .field("user", "kimchy") .field("postDate", new Date()) .field("message", "another post") .endObject() ) ); BulkResponse bulkResponse = bulkRequest.get(); if (bulkResponse.hasFailures()) { // process failures by iterating through each bulk response item }
BulkProcessor
类提供了一个简单的接口,能够根据请求的数量或大小自动刷新bulk操做,或者在给定的时间以后。
要使用它,首先建立一个BulkProcessor
实例:
import org.elasticsearch.action.bulk.BackoffPolicy; import org.elasticsearch.action.bulk.BulkProcessor; import org.elasticsearch.common.unit.ByteSizeUnit; import org.elasticsearch.common.unit.ByteSizeValue; import org.elasticsearch.common.unit.TimeValue; BulkProcessor bulkProcessor = BulkProcessor.builder( client, new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { ... } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { ... } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { ... } }) .setBulkActions(10000) .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) .setFlushInterval(TimeValue.timeValueSeconds(5)) .setConcurrentRequests(1) .setBackoffPolicy( BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3)) .build();
beforeBulk()
request.numberOfActions()
查看numberOfActions afterBulk(...BulkResponse response)
response.hasFailures()
检查是否存在失败请求afterBulk(...Throwable failure)
setBulkActions(10000)
setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))
setFlushInterval(TimeValue.timeValueSeconds(5))
setConcurrentRequests(1)
setBackoffPolicy()
EsRejectedExecutionException
失败时,将尝试重试,该异常代表用于处理请求的计算资源太少,要禁用backoff,请传递BackoffPolicy.noBackoff()
默认状况下,BulkProcessor
:
而后您能够简单地将您的请求添加到BulkProcessor
:
bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */)); bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));
当全部的文档都被加载到BulkProcessor
,可使用awaitClose
或close
方法进行关闭:
bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
或
bulkProcessor.close();
若是经过设置flushInterval来调度其余计划的flush,这两种方法都将flush全部剩余的文档,并禁用全部其余计划flush。若是并发请求被启用,那么awaitClose
方法等待指定的超时以完成全部bulk请求,而后返回true
,若是在全部bulk请求完成以前指定的等待时间已通过去,则返回false
,close
方法不等待任何剩余的批量请求完成并当即退出。
若是您正在使用Elasticsearch运行测试,而且正在使用BulkProcessor来填充数据集,那么您最好将并发请求的数量设置为0,以便以同步方式执行批量的flush操做:
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ }) .setBulkActions(10000) .setConcurrentRequests(0) .build(); // Add your requests bulkProcessor.add(/* Your requests */); // Flush any remaining requests bulkProcessor.flush(); // Or close the bulkProcessor if you don't need it anymore bulkProcessor.close(); // Refresh your indices client.admin().indices().prepareRefresh().get(); // Now you can start searching! client.prepareSearch().get();
updateByQuery最简单的用法是在不更改源的状况下更新索引中的每一个文档,这种用法容许获取一个新属性或另外一个在线映射更改。
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client); updateByQuery.source("source_index").abortOnVersionConflict(false); BulkByScrollResponse response = updateByQuery.get();
对updateByQuery API的调用从获取索引快照开始,索引使用内部版本控制找到任何文档。
注意
当一个文档在快照的时间和索引请求过程之间发生变化时,会发生版本冲突。
当版本匹配时,updateByQuery更新文档并增长版本号。
全部更新和查询失败都会致使updateByQuery停止,这些故障能够从BulkByScrollResponse#getIndexingFailures
方法中得到,任何成功的更新仍然存在,而且不会回滚,当第一次失败致使停止时,响应包含由失败的bulk请求生成的全部失败。
为了防止版本冲突致使updateByQuery停止,请设置abortOnVersionConflict(false)
,第一个示例之因此这样作,是由于它试图获取在线映射更改,而版本冲突意味着在相同时间开始updateByQuery和试图更新文档的冲突文档。这很好,由于该更新将获取在线映射更新。
UpdateByQueryRequestBuilder API支持过滤更新的文档,限制要更新的文档总数,并使用脚本更新文档:
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client); updateByQuery.source("source_index") .filter(QueryBuilders.termQuery("level", "awesome")) .size(1000) .script(new Script(ScriptType.INLINE, "ctx._source.awesome = 'absolutely'", "painless", Collections.emptyMap())); BulkByScrollResponse response = updateByQuery.get();
UpdateByQueryRequestBuilder还容许直接访问用于选择文档的查询,您可使用此访问来更改默认的滚动大小,或者以其余方式修改对匹配文档的请求。
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client); updateByQuery.source("source_index") .source().setSize(500); BulkByScrollResponse response = updateByQuery.get();
您还能够将大小与排序相结合以限制文档的更新:
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client); updateByQuery.source("source_index").size(100) .source().addSort("cat", SortOrder.DESC); BulkByScrollResponse response = updateByQuery.get();
除了更改文档的_source
字段外,还可使用脚本更改操做,相似于Update API:
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client); updateByQuery.source("source_index") .script(new Script( ScriptType.INLINE, "if (ctx._source.awesome == 'absolutely) {" + " ctx.op='noop'" + "} else if (ctx._source.awesome == 'lame') {" + " ctx.op='delete'" + "} else {" + "ctx._source.awesome = 'absolutely'}", "painless", Collections.emptyMap())); BulkByScrollResponse response = updateByQuery.get();
在Update API中,能够设置ctx.op
的值来更改执行的操做:
noop
ctx.op = "noop"
,updateByQuery操做将从更新中省略该文档,这种行为增长了响应主体中的noop计数器。delete
ctx.op = "delete"
,删除将在响应主体中已删除的计数器中报告。将ctx.op
设置为任何其余值都会产生错误,在ctx
中设置任何其余字段都会产生错误。
这个API不容许您移动它所接触的文档,只是修改它们的源,这是故意的!咱们没有规定要把文件从原来的位置移走。
您也能够同时对多个索引和类型执行这些操做,相似于search API:
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client); updateByQuery.source("foo", "bar").source().setTypes("a", "b"); BulkByScrollResponse response = updateByQuery.get();
若是提供路由值,则进程将路由值复制到滚动查询,将进程限制为与路由值匹配的碎片:
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client); updateByQuery.source().setRouting("cat"); BulkByScrollResponse response = updateByQuery.get();
updateByQuery也能够经过指定这样的pipeline来使用ingest节点:
UpdateByQueryRequestBuilder updateByQuery = UpdateByQueryAction.INSTANCE.newRequestBuilder(client); updateByQuery.setPipeline("hurray"); BulkByScrollResponse response = updateByQuery.get();
您可使用Task API获取全部正在运行的update-by-query请求的状态:
ListTasksResponse tasksList = client.admin().cluster().prepareListTasks() .setActions(UpdateByQueryAction.NAME).setDetailed(true).get(); for (TaskInfo info: tasksList.getTasks()) { TaskId taskId = info.getTaskId(); BulkByScrollTask.Status status = (BulkByScrollTask.Status) info.getStatus(); // do stuff }
使用上面所示的TaskId,您能够直接查找任务:
GetTaskResponse get = client.admin().cluster().prepareGetTask(taskId).get();
使用Cancel Task API
任何查询更新均可以使用Task Cancel API取消:
// Cancel all update-by-query requests client.admin().cluster().prepareCancelTasks().setActions(UpdateByQueryAction.NAME).get().getTasks(); // Cancel a specific update-by-query request client.admin().cluster().prepareCancelTasks().setTaskId(taskId).get().getTasks();
使用list tasks API查找taskId的值。
取消请求一般是一个很是快速的过程,但可能要花费几秒钟的时间,task status API继续列出任务,直到取消完成。
在正在运行的更新中,使用_rethrottle API更改requests_per_second
的值:
RethrottleAction.INSTANCE.newRequestBuilder(client) .setTaskId(taskId) .setRequestsPerSecond(2.0f) .get();
使用list tasks API查找taskId的值。
与updateByQuery API同样,requests_per_second
的值能够是任何正值的浮点值来设置节流的级别,或者Float.POSITIVE_INFINITY
禁用节流。requests_per_second
值加快进程当即生效,减慢查询的requests_per_second
值在完成当前批处理后生效,以防止滚动超时。
详情见reindex API
BulkByScrollResponse response = ReindexAction.INSTANCE.newRequestBuilder(client) .destination("target_index") .filter(QueryBuilders.matchQuery("category", "xzy")) .get();
还能够提供查询来筛选应该从源索引到目标索引的哪些文档。