感谢全科的ElasticSearch讲解,大部分来源于此java
MySQL | ElasticSearch |
---|---|
Database(数据库) | Index(索引) |
Table(表) | Type(类型) |
Row(行) | Document(文档) |
Column(列) | Field(字段) |
Schema(方案) | Mapping(映射) |
Index(索引) | Everthing Indexed by default(全部字段都被索引) |
SQL(结构化查询语言) | Query DSL(查询专用语言) |
Index API 容许咱们存储一个JSON格式的文档,使得数据能够被搜索到。文档经过index、type、id惟一肯定。id能够本身提供一个ID,也可使用Index API为咱们生成一个。git
有四种不一样的方式来产生JSON格式的文档(document)github
/** * 手动方式 * @throws UnknownHostException */
@Test
public void JsonDocument() throws UnknownHostException {
String json = "{" +
"\"user\":\"deepredapple\"," +
"\"postDate\":\"2018-01-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
IndexResponse indexResponse = client.prepareIndex("fendo", "fendodate").setSource(json).get();
System.out.println(indexResponse.getResult());
}
复制代码
/** * Map方式 */
@Test
public void MapDocument() {
Map<String, Object> json = new HashMap<String, Object>();
json.put("user", "hhh");
json.put("postDate", "2018-06-28");
json.put("message", "trying out Elasticsearch");
IndexResponse response = client.prepareIndex(INDEX, TYPE).setSource(json).get();
System.out.println(response.getResult());
}
复制代码
/** * 使用JACKSON序列化 */
@Test
public void JACKSONDocument() throws JsonProcessingException {
Blog blog = new Blog();
blog.setUser("123");
blog.setPostDate("2018-06-29");
blog.setMessage("try out ElasticSearch");
ObjectMapper mapper = new ObjectMapper();
byte[] bytes = mapper.writeValueAsBytes(blog);
IndexResponse response = client.prepareIndex(INDEX, TYPE).setSource(bytes).get();
System.out.println(response.getResult());
}
复制代码
/** * 使用XContentBuilder帮助类方式 */
@Test
public void XContentBuilderDocument() throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject()
.field("user", "xcontentdocument")
.field("postDate", "2018-06-30")
.field("message", "this is ElasticSearch").endObject();
IndexResponse response = client.prepareIndex(INDEX, TYPE).setSource(builder).get();
System.out.println(response.getResult());
}
复制代码
package com.deepredapple.es.document;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
/** * @author DeepRedApple */
public class TestClient {
TransportClient client = null;
public static final String INDEX = "fendo";
public static final String TYPE = "fendodate";
@Before
public void beforeClient() throws UnknownHostException {
client = new PreBuiltTransportClient(Settings.EMPTY)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
}
/** * 手动方式 * @throws UnknownHostException */
@Test
public void JsonDocument() throws UnknownHostException {
String json = "{" +
"\"user\":\"deepredapple\"," +
"\"postDate\":\"2018-01-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
IndexResponse indexResponse = client.prepareIndex(INDEX, TYPE).setSource(json).get();
System.out.println(indexResponse.getResult());
}
/** * Map方式 */
@Test
public void MapDocument() {
Map<String, Object> json = new HashMap<String, Object>();
json.put("user", "hhh");
json.put("postDate", "2018-06-28");
json.put("message", "trying out Elasticsearch");
IndexResponse response = client.prepareIndex(INDEX, TYPE).setSource(json).get();
System.out.println(response.getResult());
}
/** * 使用JACKSON序列化 */
@Test
public void JACKSONDocument() throws JsonProcessingException {
Blog blog = new Blog();
blog.setUser("123");
blog.setPostDate("2018-06-29");
blog.setMessage("try out ElasticSearch");
ObjectMapper mapper = new ObjectMapper();
byte[] bytes = mapper.writeValueAsBytes(blog);
IndexResponse response = client.prepareIndex(INDEX, TYPE).setSource(bytes).get();
System.out.println(response.getResult());
}
/** * 使用XContentBuilder帮助类方式 */
@Test
public void XContentBuilderDocument() throws IOException {
XContentBuilder builder = XContentFactory.jsonBuilder().startObject()
.field("user", "xcontentdocument")
.field("postDate", "2018-06-30")
.field("message", "this is ElasticSearch").endObject();
IndexResponse response = client.prepareIndex(INDEX, TYPE).setSource(builder).get();
System.out.println(response.getResult());
}
}
复制代码
get API 能够经过id查看文档正则表达式
GetResponse getResponse = client.prepareGet(INDEX, TYPE, "AWRJCXMhro3r8sDxIpir").get();
复制代码
参数分别为索引、类型、_id数据库
setOperationThreaded设置为true是在不一样的线程里执行此操做json
/** * Get API */
@Test
public void testGetApi() {
GetResponse getResponse = client.prepareGet(INDEX, TYPE, "AWRJCXMhro3r8sDxIpir").setOperationThreaded(false).get();
Map<String, Object> map = getResponse.getSource();
Set<String> keySet = map.keySet();
for (String str : keySet) {
Object o = map.get(str);
System.out.println(o.toString());
}
}
复制代码
根据ID删除:并发
DeleteResponse deleteResponse = client.prepareDelete(INDEX, TYPE, "AWRJCXMhro3r8sDxIpir").get();
复制代码
参数为索引、类型、_idapp
setOperationThreaded设置为true是在不一样的线程里执行此操做框架
/** * deleteAPI */
@Test
public void testDeleteAPI() {
GetResponse getResponse = client.prepareGet(INDEX, TYPE, "AWRJCXMhro3r8sDxIpir").setOperationThreaded(false).get();
System.out.println(getResponse.getSource());
DeleteResponse deleteResponse = client.prepareDelete(INDEX, TYPE, "AWRJCXMhro3r8sDxIpir").get();
System.out.println(deleteResponse.getResult());
}
复制代码
经过查询条件删除异步
/** * 经过查询条件删除 */
@Test
public void deleteByQuery() {
BulkByScrollResponse response = DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery("user", "hhh")) //查询条件
.source(INDEX).get();//索引名
long deleted = response.getDeleted();//删除文档数量
System.out.println(deleted);
}
复制代码
参数说明 QueryBuilders.matchQuery("user", "hhh") 的参数为字段和查询条件,source(INDEX)参数为索引名
当执行的删除的时间过长时,可使用异步回调的方式执行删除操做,执行的结果在回调里面获取
/** * 回调的方式执行删除 适合大数据量的删除操做 */
@Test
public void DeleteByQueryAsync() {
for (int i = 1300; i < 3000; i++) {
DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery("user", "hhh " + i))
.source(INDEX)
.execute(new ActionListener<BulkByScrollResponse>() {
public void onResponse(BulkByScrollResponse response) {
long deleted = response.getDeleted();
System.out.println("删除的文档数量为= "+deleted);
}
public void onFailure(Exception e) {
System.out.println("Failure");
}
});
}
}
复制代码
当程序中止时,在ElasticSearch的控制台依旧在执行删除操做,异步的执行操做
监听回调方法是execute方法
.execute(new ActionListener<BulkByScrollResponse>() { //回调方法
public void onResponse(BulkByScrollResponse response) {
long deleted = response.getDeleted();
System.out.println("删除的文档数量为= "+deleted);
}
public void onFailure(Exception e) {
System.out.println("Failure");
}
});
复制代码
更新索引
主要有两种方法进行更新操做
/** * 使用UpdateRequest进行更新 */
@Test
public void testUpdateAPI() throws IOException, ExecutionException, InterruptedException {
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index(INDEX);
updateRequest.type(TYPE);
updateRequest.id("AWRFv-yAro3r8sDxIpib");
updateRequest.doc(jsonBuilder()
.startObject()
.field("user", "hhh")
.endObject());
client.update(updateRequest).get();
}
复制代码
/** * 使用PrepareUpdate */
@Test
public void testUpdatePrepareUpdate() throws IOException {
client.prepareUpdate(INDEX, TYPE, "AWRFvA7k0udstXU4tl60")
.setScript(new Script("ctx._source.user = \"DeepRedApple\"")).get();
client.prepareUpdate(INDEX, TYPE, "AWRFvA7k0udstXU4tl60")
.setDoc(jsonBuilder()
.startObject()
.field("user", "DeepRedApple")
.endObject()).get();
}
复制代码
client.prepareUpdate中的setScript方法不一样的版本的参数不一样,这里直接传入值,也能够直接插入文件存储的脚本,而后直接执行脚本里面的数据进行更新操做。
使用脚本更新文档
/** * 经过脚本更新 */
@Test
public void testUpdateByScript() throws ExecutionException, InterruptedException {
UpdateRequest updateRequest = new UpdateRequest(INDEX, TYPE, "AWRFvLSTro3r8sDxIpia")
.script(new Script("ctx._source.user = \"LZH\""));
client.update(updateRequest).get();
}
复制代码
更新文档,若是存在文档就更新,若是不存在就插入
/** * 更新文档 若是存在更新,不然插入 */
@Test
public void testUpsert() throws IOException, ExecutionException, InterruptedException {
IndexRequest indexRequest = new IndexRequest(INDEX, TYPE, "AWRFvLSTro3r8sDxIp12")
.source(jsonBuilder()
.startObject()
.field("user", "hhh")
.field("postDate", "2018-02-14")
.field("message", "ElasticSearch")
.endObject());
UpdateRequest updateRequest = new UpdateRequest(INDEX, TYPE, "AWRFvLSTro3r8sDxIp12")
.doc(jsonBuilder()
.startObject()
.field("user", "LZH")
.endObject())
.upsert(indexRequest); //若是不存在,就增长indexRequest
client.update(updateRequest).get();
}
复制代码
若是参数中的_id存在,即index/type/_id存在,那么就会执行UpdateRequest,若是index/type/_id不存在,那么就直接插入
一次获取多个文档,
/** * 一次获取多个文档 */
@Test
public void TestMultiGetApi() {
MultiGetResponse responses = client.prepareMultiGet()
.add(INDEX, TYPE, "AWRFv-yAro3r8sDxIpib") //一个ID的方式
.add(INDEX, TYPE, "AWRFvA7k0udstXU4tl60", "AWRJA72Uro3r8sDxIpip")//多个ID的方式
.add("blog", "blog", "AWG9GKCwhg1e21lmGSLH") //从另外一个索引里面获取
.get();
for (MultiGetItemResponse itemResponse : responses) {
GetResponse response = itemResponse.getResponse();
if (response.isExists()) {
String source = response.getSourceAsString(); //_source
JSONObject jsonObject = JSON.parseObject(source);
Set<String> sets = jsonObject.keySet();
for (String str : sets) {
System.out.println("key -> " + str);
System.out.println("value -> "+jsonObject.get(str));
System.out.println("===============");
}
}
}
}
复制代码
Buli API 能够实现批量插入
/** * 批量插入 */
@Test
public void testBulkApi() throws IOException {
BulkRequestBuilder requestBuilder = client.prepareBulk();
requestBuilder.add(client.prepareIndex(INDEX, TYPE, "1")
.setSource(jsonBuilder()
.startObject()
.field("user", "张三")
.field("postDate", "2018-05-01")
.field("message", "zhangSan message")
.endObject()));
requestBuilder.add(client.prepareIndex(INDEX, TYPE, "2")
.setSource(jsonBuilder()
.startObject()
.field("user", "李四")
.field("postDate", "2016-09-10")
.field("message", "Lisi message")
.endObject()));
BulkResponse bulkResponse = requestBuilder.get();
if (bulkResponse.hasFailures()) {
System.out.println("error");
}
}
复制代码
使用Bulk Processor,Bulk Processor提供了一个简单的接口,在给定的大小的数量上定时批量自动请求
首先建立Bulk Processor实例
/** * 建立Processor实例 */
@Test
public void testCreateBulkProcessor() {
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
//调用Bulk以前执行,例如能够经过request.numberOfActions()方法知道numberOfActions
public void beforeBulk(long l, BulkRequest request) {
}
//调用Bulk以后执行,例如能够经过response.hasFailures()方法知道是否执行失败
public void afterBulk(long l, BulkRequest request, BulkResponse response) {
}
//调用失败抛出throwable
public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
}
}).setBulkActions(10000) //每次10000个请求
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) //拆成5MB一块
.setFlushInterval(TimeValue.timeValueSeconds(5))//不管请求数量多少,每5秒钟请求一次
.setConcurrentRequests(1)//设置并发请求的数量。值为0意味着只容许执行一个请求。值为1意味着容许1并发请求
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
//设置自定义重复请求机制,最开始等待100毫秒,以后成倍增长,重试3次,当一次或者屡次重复请求失败后由于计算资源不够抛出EsRejectedExecutionException
// 异常,能够经过BackoffPolicy.noBackoff()方法关闭重试机制
.build();
}
复制代码
BulkProcess默认设计
/** * 建立Processor实例 */
@Test
public void testCreateBulkProcessor() throws IOException {
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() {
//调用Bulk以前执行,例如能够经过request.numberOfActions()方法知道numberOfActions
public void beforeBulk(long l, BulkRequest request) {
}
//调用Bulk以后执行,例如能够经过response.hasFailures()方法知道是否执行失败
public void afterBulk(long l, BulkRequest request, BulkResponse response) {
}
//调用失败抛出throwable
public void afterBulk(long l, BulkRequest bulkRequest, Throwable throwable) {
}
}).setBulkActions(10000) //每次10000个请求
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) //拆成5MB一块
.setFlushInterval(TimeValue.timeValueSeconds(5))//不管请求数量多少,每5秒钟请求一次
.setConcurrentRequests(1)//设置并发请求的数量。值为0意味着只容许执行一个请求。值为1意味着容许1并发请求
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
//设置自定义重复请求机制,最开始等待100毫秒,以后成倍增长,重试3次,当一次或者屡次重复请求失败后由于计算资源不够抛出EsRejectedExecutionException
// 异常,能够经过BackoffPolicy.noBackoff()方法关闭重试机制
.build();
//增长requests
bulkProcessor.add(new IndexRequest(INDEX, TYPE, "3").source(
jsonBuilder()
.startObject()
.field("user", "王五")
.field("postDate", "2019-10-05")
.field("message", "wangwu message")
.endObject()));
bulkProcessor.add(new DeleteRequest(INDEX, TYPE, "1"));
bulkProcessor.flush();
//关闭bulkProcessor
bulkProcessor.close();
client.admin().indices().prepareRefresh().get();
client.prepareSearch().get();
}
复制代码
搜索API能够支持搜索查询,返回查询匹配的结果,它能够搜索一个index/type或者多个index/type,可使用Query Java API 做为查询条件
Java 默认提供QUERY_AND_FETCH和DFS_QUERY_AND_FETCH两种search Types,可是这种模式应该由系统选择,而不是用户手动指定
实例
@Test
public void testSearchApi() {
SearchResponse response = client.prepareSearch(INDEX).setTypes(TYPE)
.setQuery(QueryBuilders.matchQuery("user", "hhh")).get();
SearchHit[] hits = response.getHits().getHits();
for (int i = 0; i < hits.length; i++) {
String json = hits[i].getSourceAsString();
JSONObject object = JSON.parseObject(json);
Set<String> strings = object.keySet();
for (String str : strings) {
System.out.println(object.get(str));
}
}
}
复制代码
通常的搜索请求都时返回一页的数据,不管多大的数据量都会返回给用户,Scrolls API 能够容许咱们检索大量的数据(甚至是所有数据)。Scroll API容许咱们作一个初始阶段搜索页而且持续批量从ElasticSearch里面拉去结果知道结果没有剩下。Scroll API的建立并非为了实时的用户响应,而是为了处理大量的数据。
/** * 滚动查询 * @throws ExecutionException * @throws InterruptedException */
@Test
public void testScrollApi() throws ExecutionException, InterruptedException {
MatchQueryBuilder qb = matchQuery("user", "hhh");
SearchResponse response = client.prepareSearch(INDEX).addSort(FieldSortBuilder.DOC_FIELD_NAME,
SortOrder.ASC)
.setScroll(new TimeValue(60000)) //为了使用scroll,初始搜索请求应该在查询中指定scroll参数,告诉ElasticSearch须要保持搜索的上下文环境多长时间
.setQuery(qb)
.setSize(100).get();
do {
for (SearchHit hit : response.getHits().getHits()) {
String json = hit.getSourceAsString();
JSONObject object = JSON.parseObject(json);
Set<String> strings = object.keySet();
for (String str : strings) {
System.out.println(object.get(str));
}
}
response = client.prepareSearchScroll(response.getScrollId()).setScroll(new TimeValue(60000)).execute().get();
} while (response.getHits().getHits().length != 0);
}
复制代码
若是超过滚动时间,继续使用该滚动ID搜索数据,则会报错
虽然滚动时间已过,搜索上下文会自动被清除,可是一直保持滚动代价会很大,因此当咱们不在使用滚动时要尽快使用Clear-Scroll API进行清除。
ClearScrollRequestBuilder clearBuilder = client.prepareClearScroll();
clearBuilder.addScrollId(response.getScrollId());
ClearScrollResponse scrollResponse = clearBuilder.get();
System.out.println("是否清楚成功:"+scrollResponse.isSucceeded());
复制代码
MultiSearch API 容许在同一个API中执行多个搜索请求。它的端点是_msearch
@Test
public void testMultiSearchApi() {
SearchRequestBuilder srb1 = client.prepareSearch().setQuery(QueryBuilders.queryStringQuery("elasticsearch")).setSize(1);
SearchRequestBuilder srb2 = client.prepareSearch().setQuery(QueryBuilders.matchQuery("user", "hhh")).setSize(1);
MultiSearchResponse multiSearchResponse = client.prepareMultiSearch().add(srb1).add(srb2).get();
long nbHits = 0;
for (MultiSearchResponse.Item item : multiSearchResponse.getResponses()) {
SearchResponse response = item.getResponse();
nbHits += response.getHits().getTotalHits();
}
System.out.println(nbHits);
}
复制代码
聚合框架有助于根据搜索查询提供数据。它是基于简单的构建块也称为整合,整合就是将复杂的数据摘要有序的放在一块。聚合能够被看作是从一组文件中获取分析信息的一系列工做的统称。聚合的实现过程就是定义这个文档集的过程
@Test
public void testAggregations() {
SearchResponse searchResponse = client.prepareSearch()
.setQuery(QueryBuilders.matchAllQuery())
.addAggregation(AggregationBuilders.terms("LZH").field("user"))
.addAggregation(AggregationBuilders.dateHistogram("2013-01-30").field("postDate")
.dateHistogramInterval(DateHistogramInterval.YEAR)).get();
Terms lzh = searchResponse.getAggregations().get("user");
Histogram postDate = searchResponse.getAggregations().get("2013-01-30");
}
复制代码
获取文档的最大数量,若是设置了,须要经过SearchResponse对象里面的isTerminatedEarly()判断返回文档是否达到设置的数量
@Test
public void TestTerminate() {
SearchResponse searchResponse = client.prepareSearch(INDEX)
.setTerminateAfter(2) //若是达到这个数量,提早终止
.get();
if (searchResponse.isTerminatedEarly()) {
System.out.println(searchResponse.getHits().totalHits);
}
}
复制代码
聚合。ElasticSearch提供完整的Java API来使用聚合。使用AggregationBuilders构建对象,增长到搜索请求中。
SearchResponse response = client.prepareSearch().setQuery(/*查询*/).addAggregation(/*聚合*/).execute().actionGet();
复制代码
结构化聚合。
在计算度量类的这类聚合操做是以使用一种方式或者从文档中提取须要聚合的值为基础。
在这中间主要使用的类是**
AggregationBuilders
**,这里面包含了大量的一下的聚合方法调用,直接使用便可
MinAggregationBuilder aggregation = AggregationBuilders.min("agg").field("age");
SearchResponse sr = client.prepareSearch("twitter").addAggregation(aggregation).get();
Min agg = sr.getAggregations().get("agg");
String value = agg.getValueAsString();//这个统计的是日期,通常用下面方法得到最小值
System.out.println("min value:" + value);
复制代码
debug模式下
第一行MinAggregationBuilder的toString()执行的内容以下
{
"error": "JsonGenerationException[Can not write a field name, expecting a value]"
}
复制代码
SearchResponse sr = client.prepareSearch("twitter").addAggregation(aggregation).get(); 复制代码
在SearchResponse的toString()的内容以下, 这个内容就是查询的JSON结果,这里面的JSON结果的结构与SearchResponse的API操做相配套使用能够获取到里面的每个值。
{
"took": 1,
"timed_out": false,
"_shards": {
"total": 5,
"successful": 5,
"skipped": 0,
"failed": 0
},
"hits": {
"total": 4,
"max_score": 1.0,
"hits": [
{
"_index": "twitter",
"_type": "tweet",
"_id": "10",
"_score": 1.0,
"_source": {
"user": "kimchy",
"postDate": "2018-06-29T09:10:21.396Z",
"age": 30,
"gender": "female",
"message": "trying out Elasticsearch"
}
},
{
"_index": "twitter",
"_type": "tweet",
"_id": "2",
"_score": 1.0,
"_source": {
"user": "kimchy",
"postDate": "2018-06-29T09:05:33.943Z",
"age": 20,
"gender": "female",
"message": "trying out Elasticsearch"
}
},
{
"_index": "twitter",
"_type": "tweet",
"_id": "1",
"_score": 1.0,
"_source": {
"user": "kimchy",
"postDate": "2018-06-29T08:59:00.191Z",
"age": 10,
"gender": "male",
"message": "trying out Elasticsearch"
}
},
{
"_index": "twitter",
"_type": "tweet",
"_id": "11",
"_score": 1.0,
"_source": {
"user": "kimchy",
"postDate": "2018-06-29T09:10:54.386Z",
"age": 30,
"gender": "female",
"message": "trying out Elasticsearch"
}
}
]
},
"aggregations": {
"agg": {
"value": 10.0
}
}
}
复制代码
经过观察能够发现sr.getAggregations().get("agg");
方法就是获取其中的聚合统计的数据,其中整个代码中的参数agg能够自定义
MaxAggregationBuilder aggregation = AggregationBuilders.max("agg").field("readSize");
SearchResponse sr = client.prepareSearch("blog").addAggregation(aggregation).get();
Max agg = sr.getAggregations().get("agg");
String value = agg.getValueAsString();
System.out.println("max value:" + value);
复制代码
具体分析方法如Min Aggregation聚合同样,可是不能统计出是哪一条数据的最大最小值
SumAggregationBuilder aggregation = AggregationBuilders.sum("agg").field("readSize");
SearchResponse sr = client.prepareSearch("blog").addAggregation(aggregation).get();
Sum agg = sr.getAggregations().get("agg");
String value = agg.getValueAsString();
System.out.println("sum value:" + value);
复制代码
AvgAggregationBuilder aggregation = AggregationBuilders.avg("agg").field("age");
SearchResponse searchResponse = client.prepareSearch("twitter").addAggregation(aggregation).get();
Avg avg = searchResponse.getAggregations().get("agg");
String value = avg.getValueAsString();
System.out.println("avg value: "+ value);
复制代码
统计聚合——基于文档的某个值,计算出一些统计信息(min、max、sum、count、avg), 用于计算的值能够是特定的数值型字段,也能够经过脚本计算而来。
StatsAggregationBuilder aggregation = AggregationBuilders.stats("agg").field("age");
SearchResponse searchResponse = client.prepareSearch("twitter").addAggregation(aggregation).get();
Stats stats = searchResponse.getAggregations().get("agg");
String max = stats.getMaxAsString();
String min = stats.getMinAsString();
String avg = stats.getAvgAsString();
String sum = stats.getSumAsString();
long count = stats.getCount();
System.out.println("max value: "+max);
System.out.println("min value: "+min);
System.out.println("avg value: "+avg);
System.out.println("sum value: "+sum);
System.out.println("count value: "+count);
复制代码
这个聚合统计能够统计出上面的日常的统计值。当须要统计上面的大部分的值时,可使用这种方式
扩展统计聚合——基于文档的某个值,计算出一些统计信息(比普通的stats聚合多了sum_of_squares、variance、std_deviation、std_deviation_bounds),用于计算的值能够是特定的数值型字段,也能够经过脚本计算而来。主要的结果值就是最大、最小、方差、平方差等统计值
ExtendedStatsAggregationBuilder aggregation = AggregationBuilders.extendedStats("agg").field("age");
SearchResponse response = client.prepareSearch("twitter").addAggregation(aggregation).get();
ExtendedStats extended = response.getAggregations().get("agg");
String max = extended.getMaxAsString();
String min = extended.getMinAsString();
String avg = extended.getAvgAsString();
String sum = extended.getSumAsString();
long count = extended.getCount();
double stdDeviation = extended.getStdDeviation();
double sumOfSquares = extended.getSumOfSquares();
double variance = extended.getVariance();
System.out.println("max value: " +max);
System.out.println("min value: " +min);
System.out.println("avg value: " +avg);
System.out.println("sum value: " +sum);
System.out.println("count value: " +count);
System.out.println("stdDeviation value: " +stdDeviation);
System.out.println("sumOfSquares value: " +sumOfSquares);
System.out.println("variance value: "+variance);
复制代码
值计数聚合——计算聚合文档中某个值的个数, 用于计算的值能够是特定的数值型字段,也能够经过脚本计算而来。该聚合通常域其它 single-value 聚合联合使用,好比在计算一个字段的平均值的时候,可能还会关注这个平均值是由多少个值计算而来。
ValueCountAggregationBuilder aggregation = AggregationBuilders.count("agg").field("age");
SearchResponse response = client.prepareSearch("twitter").addAggregation(aggregation).get();
ValueCount count = response.getAggregations().get("agg");
long value = count.getValue();
System.out.println("ValueCount value: " +value);
复制代码
PercentilesAggregationBuilder aggregation = AggregationBuilders.percentiles("agg").field("age");
SearchResponse response = client.prepareSearch("twitter").addAggregation(aggregation).get();
Percentiles agg = response.getAggregations().get("agg");
for (Percentile entry : agg) {
double percent = entry.getPercent();
double value = entry.getValue();
System.out.println("percent value: " + percent + "value value: " + value);
}
复制代码
去除重复的个数的基数
CardinalityAggregationBuilder aggregation = AggregationBuilders.cardinality("agg").field("age");
SearchResponse response = client.prepareSearch("twitter").addAggregation(aggregation).get();
Cardinality agg = response.getAggregations().get("agg");
long value = agg.getValue();
System.out.println("value value: "+ value);
复制代码
查询出匹配的文档的字段的个数
TermsAggregationBuilder aggregation = AggregationBuilders.terms("agg").field("gender.keyword")
.subAggregation(AggregationBuilders.topHits("top").explain(true).size(1).from(10));
SearchResponse response = client.prepareSearch("twitter").addAggregation(aggregation).get();
Terms agg = response.getAggregations().get("agg");
for (Terms.Bucket bucket : agg.getBuckets()) {
String key = (String) bucket.getKey();
long docCount = bucket.getDocCount();
System.out.println("key value: " + key + " docCount value: " + docCount);
TopHits topHits = bucket.getAggregations().get("top");
for (SearchHit searchHitFields : topHits.getHits().getHits()) {
System.out.println("id value: " + searchHitFields.getId() + " source value: " + searchHitFields.getSourceAsString());
}
}
复制代码
查询全局的一个数量统计
AggregationBuilder aggregation = AggregationBuilders
.global("agg")
.subAggregation(
AggregationBuilders.terms("users").field("user.keyword")
);
SearchResponse sr = client.prepareSearch("twitter")
.addAggregation(aggregation)
.get();
System.out.println(sr);
Global agg = sr.getAggregations().get("agg");
long count = agg.getDocCount(); // Doc count
System.out.println("global count:" + count);
复制代码
过滤统计
AggregationBuilder aggregation = AggregationBuilders.filters("aaa", new FiltersAggregator.KeyedFilter("men", QueryBuilders.termQuery("gender", "male")));
SearchResponse sr = client.prepareSearch("twitter").setTypes("tweet").addAggregation(aggregation).get();
Filters agg = sr.getAggregations().get("aaa");
for (Filters.Bucket entry : agg.getBuckets()) {
String key = entry.getKeyAsString(); // bucket key
long docCount = entry.getDocCount(); // Doc count
System.out.println("global " + key + " count:" + docCount);
}
复制代码
多个条件过滤,查询出个数
AggregationBuilder aggregation = AggregationBuilders.filters("aaa",new FiltersAggregator.KeyedFilter("men", QueryBuilders.termQuery("gender", "male")),new FiltersAggregator.KeyedFilter("women", QueryBuilders.termQuery("gender", "female")));
SearchResponse sr = client.prepareSearch("twitter").setTypes("tweet").addAggregation(aggregation).get();
Filters agg = sr.getAggregations().get("aaa");
for (Filters.Bucket entry : agg.getBuckets()) {
String key = entry.getKeyAsString(); // bucket key
long docCount = entry.getDocCount(); // Doc count
System.out.println("global " + key + " count:" + docCount);
}
复制代码
TermsAggregationBuilder fieldAggregation = AggregationBuilders.terms("genders").field("gender.keyword")
.order(Terms.Order.term(true));
SearchResponse response = client.prepareSearch("twitter").setTypes("tweet").addAggregation(fieldAggregation).get();
Terms terms = response.getAggregations().get("genders");
for (Terms.Bucket bucket : terms.getBuckets()) {
System.out.println("key value: " + bucket.getKey());
System.out.println("docCount value: " + bucket.getDocCount());
}
复制代码
TermsAggregationBuilder fieldAggregation = AggregationBuilders.terms("genders").field("gender.keyword")
.order(Terms.Order.term(true));
复制代码
匹配全部文档
QueryBuilder qb = matchAllQuery();
复制代码
模糊匹配和字段词组查询
QueryBuilder qb = matchQuery("gender", "female");
复制代码
多个字段进行查询,字段能够有多个
QueryBuilder qb = multiMatchQuery("female","gender", "message");
复制代码
对一些比较专业的偏门词语进行更加专业的查询
QueryBuilder qb = commonTermsQuery("gender","female");
复制代码
一种与Lucene查询语法结合的查询,容许使用特殊条件去查询(AND|OR|NOT)
QueryBuilder qb = queryStringQuery("+male -female");
复制代码
一种简单的查询语法
QueryBuilder qb = queryStringQuery("+male -female");
复制代码
在指定字段中查询确切的值的文档
QueryBuilder qb = termQuery("gender","male");
复制代码
查询一个字段内的多个确切的值
QueryBuilder qb = termsQuery("age","10", "20");
复制代码
范围查询
- gte():范围查询将匹配字段值大于或等于此参数值的文档
- gt():范围查询将匹配字段值大于此参数值的文档
- lte():范围查询将匹配字段值小于或等于此参数值的文档
- lt():范围查询将匹配字段值小于此参数值的文档
- from()开始值to()结果值,这两个函数与includeLower()和includeUpper()函数配套使用
- includeLower(true)表示from()查询将匹配字段值大于或等于此参数值的文档
- includeLower(false)表示from()查询将匹配字段值大于此参数值的文档
- includeUpper(true)表示to()查询将匹配字段值小于或等于此参数值的文档
- includeUpper(false)表示to()查询将匹配字段值小于此参数值的文档
QueryBuilder qb = QueryBuilders.rangeQuery("age").gte(10).includeLower(true).lte(20).includeUpper(true);
复制代码
其中,includeLower()和includeUpper()方法表示这个范围是否包含查询
根据指定的字段名查询是否存在
QueryBuilder qb = existsQuery("user");
复制代码
根据指定字段名和指定精确前缀进行查询
QueryBuilder qb = prefixQuery("gender","m");
复制代码
通配符查询,指定字段名和通配符。其中?表示单字符通配符,*表示多字符通配符。通配符查询的字段都是未通过分析的字段
QueryBuilder qb = wildcardQuery("gender","f?*");
复制代码
根据指定字段名和正则表达式进行查询。查询的字段也是未通过分析的字段
QueryBuilder qb = regexpQuery("gender","f.*");
复制代码
模糊查询:指定的确切的字段名和拼写错误的查询内容
QueryBuilder qb = fuzzyQuery("gender","mala").fuzziness(Fuzziness.ONE);
复制代码
查询指定类型的文档
QueryBuilder qb = typeQuery("tweet");
复制代码
根据type类型和ID查询,type类型能够不写
QueryBuilder qb = idsQuery("tweet").addIds("1", "11");
复制代码