了解ES提供了哪些客户端,及客户的DOC文档连接。
掌握Java REST client 的使用。
掌握Java client 的使用。html
ES是一个服务,采用C/S结构java
回顾 ES的架构node
ES支持的客户端链接方式git
REST API ,端口 9200
Transport 链接 端口 9300github
ES提供了多种编程语言客户端spring
Client api 学习连接 https://www.elastic.co/guide/en/elasticsearch/client/index.html编程
ES提供了两个JAVA REST client 版本json
Java Low Level REST Client: 低级别的REST客户端,经过http与集群交互,用户需本身编组请求JSON串,及解析响应JSON串。兼容全部ES版本。
Java High Level REST Client: 高级别的REST客户端,基于低级别的REST客户端,增长了编组请求、解析响应等相关api。api
Java Low Level REST Client 说明数组
特色,maven 引入、使用介绍请参阅:
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-low.html
API doc 请参阅:
https://artifacts.elastic.co/javadoc/org/elasticsearch/client/elasticsearch-rest-client/6.2.4/index.html.
从6.0.0开始加入的,目的是以java面向对象的方式来进行请求、响应处理。
每一个API 支持 同步/异步 两种方式,同步方法直接返回一个结果对象。异步的方法以 async 为后缀,经过listener 参数来通知结果。
高级java REST 客户端依赖Elasticsearch core project
兼容性说明
依赖 java1.8 和 Elasticsearch core project
请使用与服务端ES版本一致的客户端版本
Java High Level REST Client maven 集成
<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>elasticsearch-rest-high-level-client</artifactId> <version>6.2.4</version> </dependency>
Java High Level REST Client 初始化
RestHighLevelClient client = new RestHighLevelClient( RestClient.builder( new HttpHost("localhost", 9200, "http"), new HttpHost("localhost", 9201, "http")));
给定集群的多个节点地址,将客户端负载均衡地向这个节点地址集发请求
Client 再也不使用了,记得关闭它。
client.close();
API
API及用法示例,请参考:
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-supported-apis.html
Create index
public class CreateIndexDemo { public static void main(String[] args) { try (RestHighLevelClient client = InitDemo.getClient();) { // 一、建立 建立索引request CreateIndexRequest request = new CreateIndexRequest("mess"); // 二、设置索引的settings request.settings(Settings.builder().put("index.number_of_shards", 3) // 分片数 .put("index.number_of_replicas", 2) // 副本数 .put("analysis.analyzer.default.tokenizer", "ik_smart") // 默认分词器 ); // 三、设置索引的mappings request.mapping("_doc", " {\n" + " \"tweet\": {\n" + " \"properties\": {\n" + " \"message\": {\n" + " \"type\": \"text\"\n" + " }\n" + " }\n" + " }\n" + " }", XContentType.JSON); // 四、 设置索引的别名 request.alias(new Alias("mmm")); // 五、 发送请求 // 5.1 同步方式发送请求 CreateIndexResponse createIndexResponse = client.indices() .create(request); // 六、处理响应 boolean acknowledged = createIndexResponse.isAcknowledged(); boolean shardsAcknowledged = createIndexResponse .isShardsAcknowledged(); System.out.println("acknowledged = " + acknowledged); System.out.println("shardsAcknowledged = " + shardsAcknowledged); // 5.1 异步方式发送请求 /*ActionListener<CreateIndexResponse> listener = new ActionListener<CreateIndexResponse>() { @Override public void onResponse( CreateIndexResponse createIndexResponse) { // 六、处理响应 boolean acknowledged = createIndexResponse.isAcknowledged(); boolean shardsAcknowledged = createIndexResponse .isShardsAcknowledged(); System.out.println("acknowledged = " + acknowledged); System.out.println( "shardsAcknowledged = " + shardsAcknowledged); } @Override public void onFailure(Exception e) { System.out.println("建立索引异常:" + e.getMessage()); } }; client.indices().createAsync(request, listener); */ } catch (IOException e) { e.printStackTrace(); } } }
代码库地址:http://code.dongnaoedu.com/3266399810/vip-elasticsearch/tree/master/es-hrset-client
index document
public class IndexDocumentDemo { private static Logger logger = LogManager.getRootLogger(); public static void main(String[] args) { try (RestHighLevelClient client = InitDemo.getClient();) { // 一、建立索引请求 IndexRequest request = new IndexRequest( "mess", //索引 "_doc", // mapping type "1"); //文档id // 二、准备文档数据 // 方式一:直接给JSON串 String jsonString = "{" + "\"user\":\"kimchy\"," + "\"postDate\":\"2013-01-30\"," + "\"message\":\"trying out Elasticsearch\"" + "}"; request.source(jsonString, XContentType.JSON); // 方式二:以map对象来表示文档 /* Map<String, Object> jsonMap = new HashMap<>(); jsonMap.put("user", "kimchy"); jsonMap.put("postDate", new Date()); jsonMap.put("message", "trying out Elasticsearch"); request.source(jsonMap); */ // 方式三:用XContentBuilder来构建文档 /* XContentBuilder builder = XContentFactory.jsonBuilder(); builder.startObject(); { builder.field("user", "kimchy"); builder.field("postDate", new Date()); builder.field("message", "trying out Elasticsearch"); } builder.endObject(); request.source(builder); */ // 方式四:直接用key-value对给出 /* request.source("user", "kimchy", "postDate", new Date(), "message", "trying out Elasticsearch"); */ //三、其余的一些可选设置 /* request.routing("routing"); //设置routing值 request.timeout(TimeValue.timeValueSeconds(1)); //设置主分片等待时长 request.setRefreshPolicy("wait_for"); //设置重刷新策略 request.version(2); //设置版本号 request.opType(DocWriteRequest.OpType.CREATE); //操做类别 */ //四、发送请求 IndexResponse indexResponse = null; try { // 同步方式 indexResponse = client.index(request); } catch(ElasticsearchException e) { // 捕获,并处理异常 //判断是否版本冲突、create但文档已存在冲突 if (e.status() == RestStatus.CONFLICT) { logger.error("冲突了,请在此写冲突处理逻辑!\n" + e.getDetailedMessage()); } logger.error("索引异常", e); } //五、处理响应 if(indexResponse != null) { String index = indexResponse.getIndex(); String type = indexResponse.getType(); String id = indexResponse.getId(); long version = indexResponse.getVersion(); if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) { System.out.println("新增文档成功,处理逻辑代码写到这里。"); } else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) { System.out.println("修改文档成功,处理逻辑代码写到这里。"); } // 分片处理信息 ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo(); if (shardInfo.getTotal() != shardInfo.getSuccessful()) { } // 若是有分片副本失败,能够得到失败缘由信息 if (shardInfo.getFailed() > 0) { for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) { String reason = failure.reason(); System.out.println("副本失败缘由:" + reason); } } } //异步方式发送索引请求 /*ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() { @Override public void onResponse(IndexResponse indexResponse) { } @Override public void onFailure(Exception e) { } }; client.indexAsync(request, listener); */ } catch (IOException e) { e.printStackTrace(); } } }
get document
public class GetDocumentDemo { private static Logger logger = LogManager.getRootLogger(); public static void main(String[] args) { try (RestHighLevelClient client = InitDemo.getClient();) { // 一、建立索引请求 GetRequest request = new GetRequest( "mess", //索引 "_doc", // mapping type "1"); //文档id // 二、可选的设置 //request.routing("routing"); //request.version(2); //request.fetchSourceContext(new FetchSourceContext(false)); //是否获取_source字段 //选择返回的字段 String[] includes = new String[]{"message", "*Date"}; String[] excludes = Strings.EMPTY_ARRAY; FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes); request.fetchSourceContext(fetchSourceContext); //也可写成这样 /*String[] includes = Strings.EMPTY_ARRAY; String[] excludes = new String[]{"message"}; FetchSourceContext fetchSourceContext = new FetchSourceContext(true, includes, excludes); request.fetchSourceContext(fetchSourceContext);*/ // 取stored字段 /*request.storedFields("message"); GetResponse getResponse = client.get(request); String message = getResponse.getField("message").getValue();*/ //三、发送请求 GetResponse getResponse = null; try { // 同步请求 getResponse = client.get(request); } catch (ElasticsearchException e) { if (e.status() == RestStatus.NOT_FOUND) { logger.error("没有找到该id的文档" ); } if (e.status() == RestStatus.CONFLICT) { logger.error("获取时版本冲突了,请在此写冲突处理逻辑!" ); } logger.error("获取文档异常", e); } //四、处理响应 if(getResponse != null) { String index = getResponse.getIndex(); String type = getResponse.getType(); String id = getResponse.getId(); if (getResponse.isExists()) { // 文档存在 long version = getResponse.getVersion(); String sourceAsString = getResponse.getSourceAsString(); //结果取成 String Map<String, Object> sourceAsMap = getResponse.getSourceAsMap(); // 结果取成Map byte[] sourceAsBytes = getResponse.getSourceAsBytes(); //结果取成字节数组 logger.info("index:" + index + " type:" + type + " id:" + id); logger.info(sourceAsString); } else { logger.error("没有找到该id的文档" ); } } //异步方式发送索引请求 /* ActionListener<GetResponse> listener = new ActionListener<GetResponse>() { @Override public void onResponse(GetResponse getResponse) { } @Override public void onFailure(Exception e) { } }; client.getAsync(request, listener); */ } catch (IOException e) { e.printStackTrace(); } } }
Bulk
public class BulkDemo { private static Logger logger = LogManager.getRootLogger(); public static void main(String[] args) { try (RestHighLevelClient client = InitDemo.getClient();) { // 一、建立批量操做请求 BulkRequest request = new BulkRequest(); request.add(new IndexRequest("mess", "_doc", "1") .source(XContentType.JSON,"field", "foo")); request.add(new IndexRequest("mess", "_doc", "2") .source(XContentType.JSON,"field", "bar")); request.add(new IndexRequest("mess", "_doc", "3") .source(XContentType.JSON,"field", "baz")); /* request.add(new DeleteRequest("mess", "_doc", "3")); request.add(new UpdateRequest("mess", "_doc", "2") .doc(XContentType.JSON,"other", "test")); request.add(new IndexRequest("mess", "_doc", "4") .source(XContentType.JSON,"field", "baz")); */ // 二、可选的设置 /* request.timeout("2m"); request.setRefreshPolicy("wait_for"); request.waitForActiveShards(2); */ //三、发送请求 // 同步请求 BulkResponse bulkResponse = client.bulk(request); //四、处理响应 if(bulkResponse != null) { for (BulkItemResponse bulkItemResponse : bulkResponse) { DocWriteResponse itemResponse = bulkItemResponse.getResponse(); if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX || bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) { IndexResponse indexResponse = (IndexResponse) itemResponse; //TODO 新增成功的处理 } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) { UpdateResponse updateResponse = (UpdateResponse) itemResponse; //TODO 修改为功的处理 } else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) { DeleteResponse deleteResponse = (DeleteResponse) itemResponse; //TODO 删除成功的处理 } } } //异步方式发送批量操做请求 /* ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() { @Override public void onResponse(BulkResponse bulkResponse) { } @Override public void onFailure(Exception e) { } }; client.bulkAsync(request, listener); */ } catch (IOException e) { e.printStackTrace(); } } }
search
public class SearchDemo { private static Logger logger = LogManager.getRootLogger(); public static void main(String[] args) { try (RestHighLevelClient client = InitDemo.getClient();) { // 一、建立search请求 //SearchRequest searchRequest = new SearchRequest(); SearchRequest searchRequest = new SearchRequest("bank"); searchRequest.types("_doc"); // 二、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法,构造各类查询的方法都在这。 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); //构造QueryBuilder /*QueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("user", "kimchy") .fuzziness(Fuzziness.AUTO) .prefixLength(3) .maxExpansions(10); sourceBuilder.query(matchQueryBuilder);*/ sourceBuilder.query(QueryBuilders.termQuery("age", 24)); sourceBuilder.from(0); sourceBuilder.size(10); sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS)); //是否返回_source字段 //sourceBuilder.fetchSource(false); //设置返回哪些字段 /*String[] includeFields = new String[] {"title", "user", "innerObject.*"}; String[] excludeFields = new String[] {"_type"}; sourceBuilder.fetchSource(includeFields, excludeFields);*/ //指定排序 //sourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC)); //sourceBuilder.sort(new FieldSortBuilder("_uid").order(SortOrder.ASC)); // 设置返回 profile //sourceBuilder.profile(true); //将请求体加入到请求中 searchRequest.source(sourceBuilder); // 可选的设置 //searchRequest.routing("routing"); // 高亮设置 /* HighlightBuilder highlightBuilder = new HighlightBuilder(); HighlightBuilder.Field highlightTitle = new HighlightBuilder.Field("title"); highlightTitle.highlighterType("unified"); highlightBuilder.field(highlightTitle); HighlightBuilder.Field highlightUser = new HighlightBuilder.Field("user"); highlightBuilder.field(highlightUser); sourceBuilder.highlighter(highlightBuilder);*/ //加入聚合 /*TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_company") .field("company.keyword"); aggregation.subAggregation(AggregationBuilders.avg("average_age") .field("age")); sourceBuilder.aggregation(aggregation);*/ //作查询建议 /*SuggestionBuilder termSuggestionBuilder = SuggestBuilders.termSuggestion("user").text("kmichy"); SuggestBuilder suggestBuilder = new SuggestBuilder(); suggestBuilder.addSuggestion("suggest_user", termSuggestionBuilder); sourceBuilder.suggest(suggestBuilder);*/ //三、发送请求 SearchResponse searchResponse = client.search(searchRequest); //四、处理响应 //搜索结果状态信息 RestStatus status = searchResponse.status(); TimeValue took = searchResponse.getTook(); Boolean terminatedEarly = searchResponse.isTerminatedEarly(); boolean timedOut = searchResponse.isTimedOut(); //分片搜索状况 int totalShards = searchResponse.getTotalShards(); int successfulShards = searchResponse.getSuccessfulShards(); int failedShards = searchResponse.getFailedShards(); for (ShardSearchFailure failure : searchResponse.getShardFailures()) { // failures should be handled here } //处理搜索命中文档结果 SearchHits hits = searchResponse.getHits(); long totalHits = hits.getTotalHits(); float maxScore = hits.getMaxScore(); SearchHit[] searchHits = hits.getHits(); for (SearchHit hit : searchHits) { // do something with the SearchHit String index = hit.getIndex(); String type = hit.getType(); String id = hit.getId(); float score = hit.getScore(); //取_source字段值 String sourceAsString = hit.getSourceAsString(); //取成json串 Map<String, Object> sourceAsMap = hit.getSourceAsMap(); // 取成map对象 //从map中取字段值 /* String documentTitle = (String) sourceAsMap.get("title"); List<Object> users = (List<Object>) sourceAsMap.get("user"); Map<String, Object> innerObject = (Map<String, Object>) sourceAsMap.get("innerObject"); */ logger.info("index:" + index + " type:" + type + " id:" + id); logger.info(sourceAsString); //取高亮结果 /*Map<String, HighlightField> highlightFields = hit.getHighlightFields(); HighlightField highlight = highlightFields.get("title"); Text[] fragments = highlight.fragments(); String fragmentString = fragments[0].string();*/ } // 获取聚合结果 /* Aggregations aggregations = searchResponse.getAggregations(); Terms byCompanyAggregation = aggregations.get("by_company"); Bucket elasticBucket = byCompanyAggregation.getBucketByKey("Elastic"); Avg averageAge = elasticBucket.getAggregations().get("average_age"); double avg = averageAge.getValue(); */ // 获取建议结果 /*Suggest suggest = searchResponse.getSuggest(); TermSuggestion termSuggestion = suggest.getSuggestion("suggest_user"); for (TermSuggestion.Entry entry : termSuggestion.getEntries()) { for (TermSuggestion.Entry.Option option : entry) { String suggestText = option.getText().string(); } } */ //异步方式发送获查询请求 /* ActionListener<SearchResponse> listener = new ActionListener<SearchResponse>() { @Override public void onResponse(SearchResponse getResponse) { //结果获取 } @Override public void onFailure(Exception e) { //失败处理 } }; client.searchAsync(searchRequest, listener); */ } catch (IOException e) { logger.error(e); } } }
highlight
public class HighlightDemo { private static Logger logger = LogManager.getRootLogger(); public static void main(String[] args) { try (RestHighLevelClient client = InitDemo.getClient();) { // 一、建立search请求 SearchRequest searchRequest = new SearchRequest("hl_test"); // 二、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法,构造各类查询的方法都在这。 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); //构造QueryBuilder QueryBuilder matchQueryBuilder = QueryBuilders.matchQuery("title", "lucene solr"); sourceBuilder.query(matchQueryBuilder); //分页设置 /*sourceBuilder.from(0); sourceBuilder.size(5); ;*/ // 高亮设置 HighlightBuilder highlightBuilder = new HighlightBuilder(); highlightBuilder.requireFieldMatch(false).field("title").field("content") .preTags("<strong>").postTags("</strong>"); //不一样字段可有不一样设置,如不一样标签 /*HighlightBuilder.Field highlightTitle = new HighlightBuilder.Field("title"); highlightTitle.preTags("<strong>").postTags("</strong>"); highlightBuilder.field(highlightTitle); HighlightBuilder.Field highlightContent = new HighlightBuilder.Field("content"); highlightContent.preTags("<b>").postTags("</b>"); highlightBuilder.field(highlightContent).requireFieldMatch(false);*/ sourceBuilder.highlighter(highlightBuilder); searchRequest.source(sourceBuilder); //三、发送请求 SearchResponse searchResponse = client.search(searchRequest); //四、处理响应 if(RestStatus.OK.equals(searchResponse.status())) { //处理搜索命中文档结果 SearchHits hits = searchResponse.getHits(); long totalHits = hits.getTotalHits(); SearchHit[] searchHits = hits.getHits(); for (SearchHit hit : searchHits) { String index = hit.getIndex(); String type = hit.getType(); String id = hit.getId(); float score = hit.getScore(); //取_source字段值 //String sourceAsString = hit.getSourceAsString(); //取成json串 Map<String, Object> sourceAsMap = hit.getSourceAsMap(); // 取成map对象 //从map中取字段值 /*String title = (String) sourceAsMap.get("title"); String content = (String) sourceAsMap.get("content"); */ logger.info("index:" + index + " type:" + type + " id:" + id); logger.info("sourceMap : " + sourceAsMap); //取高亮结果 Map<String, HighlightField> highlightFields = hit.getHighlightFields(); HighlightField highlight = highlightFields.get("title"); if(highlight != null) { Text[] fragments = highlight.fragments(); //多值的字段会有多个值 if(fragments != null) { String fragmentString = fragments[0].string(); logger.info("title highlight : " + fragmentString); //可用高亮字符串替换上面sourceAsMap中的对应字段返回到上一级调用 //sourceAsMap.put("title", fragmentString); } } highlight = highlightFields.get("content"); if(highlight != null) { Text[] fragments = highlight.fragments(); //多值的字段会有多个值 if(fragments != null) { String fragmentString = fragments[0].string(); logger.info("content highlight : " + fragmentString); //可用高亮字符串替换上面sourceAsMap中的对应字段返回到上一级调用 //sourceAsMap.put("content", fragmentString); } } } } } catch (IOException e) { logger.error(e); } } }
suggest
public class SuggestDemo { private static Logger logger = LogManager.getRootLogger(); public static void termSuggest() { try (RestHighLevelClient client = InitDemo.getClient();) { // 一、建立search请求 //SearchRequest searchRequest = new SearchRequest(); SearchRequest searchRequest = new SearchRequest("mess"); // 二、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法,构造各类查询的方法都在这。 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.size(0); //作查询建议 //词项建议 SuggestionBuilder termSuggestionBuilder = SuggestBuilders.termSuggestion("user").text("kmichy"); SuggestBuilder suggestBuilder = new SuggestBuilder(); suggestBuilder.addSuggestion("suggest_user", termSuggestionBuilder); sourceBuilder.suggest(suggestBuilder); searchRequest.source(sourceBuilder); //三、发送请求 SearchResponse searchResponse = client.search(searchRequest); //四、处理响应 //搜索结果状态信息 if(RestStatus.OK.equals(searchResponse.status())) { // 获取建议结果 Suggest suggest = searchResponse.getSuggest(); TermSuggestion termSuggestion = suggest.getSuggestion("suggest_user"); for (TermSuggestion.Entry entry : termSuggestion.getEntries()) { logger.info("text: " + entry.getText().string()); for (TermSuggestion.Entry.Option option : entry) { String suggestText = option.getText().string(); logger.info(" suggest option : " + suggestText); } } } /* "suggest": { "my-suggestion": [ { "text": "tring", "offset": 0, "length": 5, "options": [ { "text": "trying", "score": 0.8, "freq": 1 } ] }, { "text": "out", "offset": 6, "length": 3, "options": [] }, { "text": "elasticsearch", "offset": 10, "length": 13, "options": [] } ] }*/ } catch (IOException e) { logger.error(e); } } public static void completionSuggester() { try (RestHighLevelClient client = InitDemo.getClient();) { // 一、建立search请求 //SearchRequest searchRequest = new SearchRequest(); SearchRequest searchRequest = new SearchRequest("music"); // 二、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法,构造各类查询的方法都在这。 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.size(0); //作查询建议 //自动补全 /*POST music/_search?pretty { "suggest": { "song-suggest" : { "prefix" : "lucene s", "completion" : { "field" : "suggest" , "skip_duplicates": true } } } }*/ SuggestionBuilder termSuggestionBuilder = SuggestBuilders.completionSuggestion("suggest").prefix("lucene s") .skipDuplicates(true); SuggestBuilder suggestBuilder = new SuggestBuilder(); suggestBuilder.addSuggestion("song-suggest", termSuggestionBuilder); sourceBuilder.suggest(suggestBuilder); searchRequest.source(sourceBuilder); //三、发送请求 SearchResponse searchResponse = client.search(searchRequest); //四、处理响应 //搜索结果状态信息 if(RestStatus.OK.equals(searchResponse.status())) { // 获取建议结果 Suggest suggest = searchResponse.getSuggest(); CompletionSuggestion termSuggestion = suggest.getSuggestion("song-suggest"); for (CompletionSuggestion.Entry entry : termSuggestion.getEntries()) { logger.info("text: " + entry.getText().string()); for (CompletionSuggestion.Entry.Option option : entry) { String suggestText = option.getText().string(); logger.info(" suggest option : " + suggestText); } } } } catch (IOException e) { logger.error(e); } } public static void main(String[] args) { termSuggest(); logger.info("--------------------------------------"); completionSuggester(); } }
aggregation
public class AggregationDemo { private static Logger logger = LogManager.getRootLogger(); public static void main(String[] args) { try (RestHighLevelClient client = InitDemo.getClient();) { // 一、建立search请求 //SearchRequest searchRequest = new SearchRequest(); SearchRequest searchRequest = new SearchRequest("bank"); // 二、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法,构造各类查询的方法都在这。 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.size(0); //加入聚合 //字段值项分组聚合 TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_age") .field("age").order(BucketOrder.aggregation("average_balance", true)); //计算每组的平均balance指标 aggregation.subAggregation(AggregationBuilders.avg("average_balance") .field("balance")); sourceBuilder.aggregation(aggregation); searchRequest.source(sourceBuilder); //三、发送请求 SearchResponse searchResponse = client.search(searchRequest); //四、处理响应 //搜索结果状态信息 if(RestStatus.OK.equals(searchResponse.status())) { // 获取聚合结果 Aggregations aggregations = searchResponse.getAggregations(); Terms byAgeAggregation = aggregations.get("by_age"); logger.info("aggregation by_age 结果"); logger.info("docCountError: " + byAgeAggregation.getDocCountError()); logger.info("sumOfOtherDocCounts: " + byAgeAggregation.getSumOfOtherDocCounts()); logger.info("------------------------------------"); for(Bucket buck : byAgeAggregation.getBuckets()) { logger.info("key: " + buck.getKeyAsNumber()); logger.info("docCount: " + buck.getDocCount()); logger.info("docCountError: " + buck.getDocCountError()); //取子聚合 Avg averageBalance = buck.getAggregations().get("average_balance"); logger.info("average_balance: " + averageBalance.getValue()); logger.info("------------------------------------"); } //直接用key 来去分组 /*Bucket elasticBucket = byCompanyAggregation.getBucketByKey("24"); Avg averageAge = elasticBucket.getAggregations().get("average_age"); double avg = averageAge.getValue();*/ } } catch (IOException e) { logger.error(e); } } }public class AggregationDemo { private static Logger logger = LogManager.getRootLogger(); public static void main(String[] args) { try (RestHighLevelClient client = InitDemo.getClient();) { // 一、建立search请求 //SearchRequest searchRequest = new SearchRequest(); SearchRequest searchRequest = new SearchRequest("bank"); // 二、用SearchSourceBuilder来构造查询请求体 ,请仔细查看它的方法,构造各类查询的方法都在这。 SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.size(0); //加入聚合 //字段值项分组聚合 TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_age") .field("age").order(BucketOrder.aggregation("average_balance", true)); //计算每组的平均balance指标 aggregation.subAggregation(AggregationBuilders.avg("average_balance") .field("balance")); sourceBuilder.aggregation(aggregation); searchRequest.source(sourceBuilder); //三、发送请求 SearchResponse searchResponse = client.search(searchRequest); //四、处理响应 //搜索结果状态信息 if(RestStatus.OK.equals(searchResponse.status())) { // 获取聚合结果 Aggregations aggregations = searchResponse.getAggregations(); Terms byAgeAggregation = aggregations.get("by_age"); logger.info("aggregation by_age 结果"); logger.info("docCountError: " + byAgeAggregation.getDocCountError()); logger.info("sumOfOtherDocCounts: " + byAgeAggregation.getSumOfOtherDocCounts()); logger.info("------------------------------------"); for(Bucket buck : byAgeAggregation.getBuckets()) { logger.info("key: " + buck.getKeyAsNumber()); logger.info("docCount: " + buck.getDocCount()); logger.info("docCountError: " + buck.getDocCountError()); //取子聚合 Avg averageBalance = buck.getAggregations().get("average_balance"); logger.info("average_balance: " + averageBalance.getValue()); logger.info("------------------------------------"); } //直接用key 来去分组 /*Bucket elasticBucket = byCompanyAggregation.getBucketByKey("24"); Avg averageAge = elasticBucket.getAggregations().get("average_age"); double avg = averageAge.getValue();*/ } } catch (IOException e) { logger.error(e); } } }
各类查询对应的QueryBuilder
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-query-builders.html
各类聚合对应的AggregationBuilder
https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-high-aggregation-builders.html
Java Client 说明
java client 使用 TransportClient,各类操做本质上都是异步的(能够用 listener,或返回 Future )。
注意:ES的发展规划中在7.0版本开始将废弃 TransportClient,8.0版本中将彻底移除 TransportClient,取而代之的是High Level REST Client。
High Level REST Client 中的操做API和java client 大可能是同样的。
官方学习连接
https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/index.html
兼容性说明
请使用与服务端ES版本一致的客户端版本
Java Client maven 集成
<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>6.2.4</version> </dependency>
依赖的其余jar和high REST Client基本同样
Java Client logger 日志器说明
使用的是 log4j2 日志组件。
若是要使用其余的日志组件,可以使用slf4j做桥
Init Client
// on startup Settings settings = Settings.builder() .put("cluster.name", "myClusterName").build(); /链接集群的设置 TransportClient client = new PreBuiltTransportClient(settings) .addTransportAddress(new TransportAddress(InetAddress.getByName("host1"), 9300)) .addTransportAddress(new TransportAddress(InetAddress.getByName("host2"), 9300)); //链接的transport地址,注意端口是9300 // on shutdown client.close(); //在关闭时记得关闭客户端对象
Init Client setting 可用参数说明
cluster.name 指定集群的名字,若是集群的名字不是默认的elasticsearch,需指定。 client.transport.sniff 设置为true,将自动嗅探整个集群,自动加入集群的节点到链接列表中。 client.transport.ignore_cluster_name Set to true to ignore cluster name validation of connected nodes. (since 0.19.4) client.transport.ping_timeout The time to wait for a ping response from a node. Defaults to 5s. client.transport.nodes_sampler_interval How often to sample / ping the nodes listed and connected. Defaults to 5s.
Document API 文档操做API
官网详细介绍连接: https://www.elastic.co/guide/en/elasticsearch/client/java-api/6.2/java-docs.html
Search API
https://www.elastic.co/guide/en/elasticsearch/client/java-api/6.2/java-search.html
代码示例: es-java-client 地址
http://code.dongnaoedu.com/3266399810/vip-elasticsearch/tree/master/
Spring Data Elasticsearch 学习连接
https://docs.spring.io/spring-data/elasticsearch/docs/current/reference/html/ https://github.com/spring-projects/spring-data-elasticsearch.