前面历经33篇内容的讲解,与ES的请求操做都是在Kibana平台上用Restful请求完成的,一直没发布Java或python的客户端代码,Restful才是运用、理解ES核心功能最直接的表达方式,但实际项目中确定是以Java/python来完成ES请求的发起与数据处理的,前面理解了ES的核心功能,后面Java API的使用将会很是简单,剩余的未覆盖的功能API,自行查阅文档便可。html
本篇讲解Elasticsearch的客户端API开发的一些示例,以Java语言为主,介绍一些最经常使用,最核心的API。java
咱们以maven项目为例,添加项目依赖node
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>6.3.1</version> </dependency> <dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>6.3.1</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.12.1</version> </dependency>
Settings settings = Settings.builder().put("cluster.name", "elasticsearch").build(); TransportClient client = new PreBuiltTransportClient(settings).addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("localhost"), 9300));
若是集群的节点数比较多,为每一个node分别指定IP、Port可行性不高,咱们可使用集群节点自动探查的功能,代码以下:python
// 将client.transport.sniff设置为true便可打开集群节点自动探查功能 Settings settings = Settings.builder().put("client.transport.sniff", true)..put("cluster.name", "elasticsearch").build(); // 只须要指定一个node就行 TransportClient client = new PreBuiltTransportClient(settings); transport.addTransportAddress(new TransportAddress(InetAddress.getByName("192.168.17.137"), 9300));
最基本的CRUD代码,能够看成入门demo来写:mysql
/** * 建立员工信息(建立一个document) * @param client */ private static void createEmployee(TransportClient client) throws Exception { IndexResponse response = client.prepareIndex("company", "employee", "1") .setSource(XContentFactory.jsonBuilder() .startObject() .field("name", "jack") .field("age", 27) .field("position", "technique") .field("country", "china") .field("join_date", "2017-01-01") .field("salary", 10000) .endObject()) .get(); System.out.println(response.getResult()); } /** * 获取员工信息 * @param client * @throws Exception */ private static void getEmployee(TransportClient client) throws Exception { GetResponse response = client.prepareGet("company", "employee", "1").get(); System.out.println(response.getSourceAsString()); } /** * 修改员工信息 * @param client * @throws Exception */ private static void updateEmployee(TransportClient client) throws Exception { UpdateResponse response = client.prepareUpdate("company", "employee", "1") .setDoc(XContentFactory.jsonBuilder() .startObject() .field("position", "technique manager") .endObject()) .get(); System.out.println(response.getResult()); } /** * 删除 员工信息 * @param client * @throws Exception */ private static void deleteEmployee(TransportClient client) throws Exception { DeleteResponse response = client.prepareDelete("company", "employee", "1").get(); System.out.println(response.getResult()); }
咱们以前使用Restful的搜索,如今改用java实现,原有的Restful示例以下:sql
GET /company/employee/_search { "query": { "bool": { "must": [ { "match": { "position": "technique" } } ], "filter": { "range": { "age": { "gte": 30, "lte": 40 } } } } }, "from": 0, "size": 1 }
等同于这样的Java代码:apache
SearchResponse response = client.prepareSearch("company") .setTypes("employee") .setQuery(QueryBuilders.termQuery("position", "technique")) // Query .setPostFilter(QueryBuilders.rangeQuery("age").from(30).to(40)) // Filter .setFrom(0).setSize(60) .get();
聚合查询稍微麻烦一些,请求的封装和响应报文的解析,都是根据实际返回的结构来作的,例以下面的查询:json
需求:设计模式
Restful的请求以下:api
GET /company/employee/_search { "size": 0, "aggs": { "group_by_country": { "terms": { "field": "country" }, "aggs": { "group_by_join_date": { "date_histogram": { "field": "join_date", "interval": "year" }, "aggs": { "avg_salary": { "avg": { "field": "salary" } } } } } } } }
用Java编写的请求以下:
SearchResponse sr = node.client().prepareSearch() .addAggregation( AggregationBuilders.terms("by_country").field("country") .subAggregation(AggregationBuilders.dateHistogram("by_year") .field("dateOfBirth") .dateHistogramInterval(DateHistogramInterval.YEAR) .subAggregation(AggregationBuilders.avg("avg_children").field("children")) ) ) .execute().actionGet();
对响应的处理,则须要一层一层获取数据:
Map<String, Aggregation> aggrMap = searchResponse.getAggregations().asMap(); StringTerms groupByCountry = (StringTerms) aggrMap.get("group_by_country"); Iterator<Bucket> groupByCountryBucketIterator = groupByCountry.getBuckets().iterator(); while(groupByCountryBucketIterator.hasNext()) { Bucket groupByCountryBucket = groupByCountryBucketIterator.next(); System.out.println(groupByCountryBucket.getKey() + "\t" + groupByCountryBucket.getDocCount()); Histogram groupByJoinDate = (Histogram) groupByCountryBucket.getAggregations().asMap().get("group_by_join_date"); Iterator<org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket> groupByJoinDateBucketIterator = groupByJoinDate.getBuckets().iterator(); while(groupByJoinDateBucketIterator.hasNext()) { org.elasticsearch.search.aggregations.bucket.histogram.Histogram.Bucket groupByJoinDateBucket = groupByJoinDateBucketIterator.next(); System.out.println(groupByJoinDateBucket.getKey() + "\t" + groupByJoinDateBucket.getDocCount()); Avg avgSalary = (Avg) groupByJoinDateBucket.getAggregations().asMap().get("avg_salary"); System.out.println(avgSalary.getValue()); } } client.close(); }
private static void upsert(TransportClient transport) { try { IndexRequest index = new IndexRequest("book_shop", "books", "2").source( XContentFactory.jsonBuilder().startObject() .field("name", "mysql从入门到删库跑路") .field("tags", "mysql") .field("price", 32.8) .endObject()); UpdateRequest update = new UpdateRequest("book_shop", "books", "2") .doc(XContentFactory.jsonBuilder() .startObject().field("price", 31.8) .endObject()) .upsert(index); UpdateResponse response = transport.update(update).get(); System.out.println(response.getVersion()); } catch (IOException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }
public static void mget(TransportClient transport) { MultiGetResponse res = transport.prepareMultiGet() .add("book_shop", "books", "1") .add("book_shop", "books", "2") .get(); for (MultiGetItemResponse item : res.getResponses()) { System.out.println(item.getResponse()); } }
public static void bulk(TransportClient transport) { try { BulkRequestBuilder bulk = transport.prepareBulk(); bulk.add(transport.prepareIndex("book_shop", "books", "3").setSource( XContentFactory.jsonBuilder().startObject() .field("name", "设计模式从入门到拷贝代码") .field("tags", "设计模式") .field("price", 55.9) .endObject())); bulk.add(transport.prepareIndex("book_shop", "books", "4").setSource( XContentFactory.jsonBuilder().startObject() .field("name", "架构设计从入门到google搜索") .field("tags", "架构设计") .field("price", 68.9) .endObject())); bulk.add(transport.prepareUpdate("book_shop", "books", "1").setDoc((XContentFactory.jsonBuilder() .startObject().field("price", 32.8) .endObject()))); BulkResponse bulkRes = bulk.get(); if (bulkRes.hasFailures()) { System.out.println("Error..."); } } catch (IOException e) { e.printStackTrace(); } }
public static void scorll(TransportClient client) { SearchResponse bookShop = client.prepareSearch("book_shop").setScroll(new TimeValue(60000)).setSize(1).get(); int batchCnt = 0; do { // 循环读取scrollid信息,直到结果为空 for(SearchHit hit: bookShop.getHits().getHits()) { System.out.println("batchCnt:" + ++batchCnt); System.out.println(hit.getSourceAsString()); } bookShop = client.prepareSearchScroll(bookShop.getScrollId()).setScroll(new TimeValue(60000)).execute().actionGet(); } while (bookShop.getHits().getHits().length != 0); }
public static void searchTemplates(TransportClient client) { Map<String,Object> params = new HashMap<>(10); params.put("from",0); params.put("size",10); params.put("tags","java"); SearchTemplateResponse str = new SearchTemplateRequestBuilder(client) .setScript("page_query_by_tags") .setScriptType(ScriptType.STORED) .setScriptParams(params) .setRequest(new SearchRequest()) .get(); for(SearchHit hit:str.getResponse().getHits().getHits()) { System.out.println(hit.getSourceAsString()); } }
public static void otherSearch(TransportClient client) { SearchResponse response1 = client.prepareSearch("book_shop").setQuery(QueryBuilders.termQuery("tags", "java")).get(); SearchResponse response2 = client.prepareSearch("book_shop").setQuery(QueryBuilders.multiMatchQuery("32.8", "price","tags")).get(); SearchResponse response3 = client.prepareSearch("book_shop").setQuery(QueryBuilders.commonTermsQuery("name", "入门")).get(); SearchResponse response4 = client.prepareSearch("book_shop").setQuery(QueryBuilders.prefixQuery("name", "java")).get(); System.out.println(response1.getHits().getHits()[0].getSourceAsString()); System.out.println(response2.getHits().getHits()[0].getSourceAsString()); System.out.println(response3.getHits().getHits()[0].getSourceAsString()); System.out.println(response4.getHits().getHits()[0].getSourceAsString()); // 多个条件组合 SearchResponse response5 = client.prepareSearch("book_shop").setQuery(QueryBuilders.boolQuery() .must(QueryBuilders.termQuery("tags", "java")) .mustNot(QueryBuilders.matchQuery("name", "跑路")) .should(QueryBuilders.matchQuery("name", "入门")) .filter(QueryBuilders.rangeQuery("price").gte(23).lte(55))).get(); System.out.println(response5.getHits().getHits()[0].getSourceAsString()); }
public static void geo(TransportClient client) { GeoBoundingBoxQueryBuilder query1 = QueryBuilders.geoBoundingBoxQuery("location").setCorners(23, 112, 21, 114); List<GeoPoint> points = new ArrayList<>(); points.add(new GeoPoint(23,115)); points.add(new GeoPoint(25,113)); points.add(new GeoPoint(21,112)); GeoPolygonQueryBuilder query2 = QueryBuilders.geoPolygonQuery("location",points); GeoDistanceQueryBuilder query3 = QueryBuilders.geoDistanceQuery("location").point(22.523375, 113.911231).distance(500, DistanceUnit.METERS); SearchResponse response = client.prepareSearch("location").setQuery(query3).get(); for(SearchHit hit:response.getHits().getHits()) { System.out.println(hit.getSourceAsString()); } }
上述的那些案例demo,快速浏览一下便可,若是已经在开发ES相关的项目,仍是多参考官方的API文档:https://www.elastic.co/guide/...。上面有很详尽的API说明和使用Demo
专一Java高并发、分布式架构,更多技术干货分享与心得,请关注公众号:Java架构社区
能够扫左边二维码添加好友,邀请你加入Java架构社区微信群共同探讨技术