1、说明:html
1、Elasticsearch提供了两个JAVA REST Client版本:java
一、java low level rest client:web
低级别的rest客户端,经过http与集群交互,用户需本身编组请求JSON串,及解析响应JSON串。兼容全部Elasticsearch版本。apache
特色:maven引入json
使用介绍: https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/java-rest-low.htmlapi
二、java high rest client:app
高级别的REST客户端,基于低级别的REST客户端,增长了编组请求JSON串、解析响应JSON串等相关API,使用的版本须要保存和ES服务一致的版本,不然会有版本问题。异步
从6.0.0开始加入的,目的是以java面向对象的方式进行请求、响应处理。async
每一个API支持 同步、异步 两种方式,同步方法之间返回一个结果对象。异步的方法以async为后缀,经过listener参数来通知结果。高级java resy客户端依赖Elasticsearch core pprojectelasticsearch
兼容性说明:
依赖jdk1.8和Elasticsearch core project
2、Java Low Level Rest Client的使用
版本:
Elasticsearch 6.3.1
pom文件:
<dependency> <groupId>org.elasticsearch.client</groupId> <artifactId>transport</artifactId> <version>6.3.1</version> </dependency> <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>6.3.1</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-api</artifactId> <version>2.7</version> </dependency> <dependency> <groupId>org.apache.logging.log4j</groupId> <artifactId>log4j-core</artifactId> <version>2.7</version> </dependency> <dependency> <groupId>net.sf.json-lib</groupId> <artifactId>json-lib</artifactId> <version>0.9</version> </dependency>
1、构建elasicsearch client工具类
import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.transport.client.PreBuiltTransportClient; import java.net.InetAddress; /** * @Author: xiaolaotou * @Date: 2019/4/19 */ /** * 构建elasticsrarch client */ public class ClientUtil { private static TransportClient client; public TransportClient CreateClient() throws Exception { // 先构建client System.out.println("11111111111"); Settings settings=Settings.builder() .put("cluster.name","elasticsearch1") .put("client.transport.ignore_cluster_name", true) //若是集群名不对,也能链接 .build(); //建立Client TransportClient client = new PreBuiltTransportClient(settings) .addTransportAddress( new TransportAddress( InetAddress.getByName( "192.168.200.100"), 9300)); return client; } }
2、测试类
import net.sf.json.JSONObject; import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.action.update.UpdateRequest; import org.elasticsearch.action.update.UpdateResponse; import org.elasticsearch.client.Requests; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHits; import java.io.BufferedWriter; import java.io.File; import java.io.FileWriter; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; import java.util.Map; import java.util.Set; import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder; /** * @Author: xiaolaotou * @Date: 2019/4/19 * ElasticSearch 6.3.1 */ public class Test { private static TransportClient client; static { try { client = new ClientUtil().CreateClient(); } catch (Exception e) { e.printStackTrace(); } } public static void main(String[] args) throws Exception { //建立索引 // createEmployee(); //根据inde,type,id查询一个document的data // FindIndex(); // CreateJsonIndex(); //批量导入 // BulkCreateIndex(); //批量导出 // OutData(); //建立带ik分词的index // CreateIndexIkTest(); //更新索引 // UpdateIndex(); // createIndex2(); // Search(); get(); } /** * 建立索引,普通格式 * * @throws Exception */ public static void createEmployee() throws Exception { IndexResponse response = client.prepareIndex("student", "doc", "1") .setSource(jsonBuilder() .startObject() .field("name", "jack") .field("age", 27) .field("position", "technique") .field("country", "china") .field("join_date", "2017-01-01") .field("salary", 10000) .endObject()) .get(); System.out.println("建立成功!"); }
/** * 根据 index ,type,id查询 * * @throws Exception */ public static void FindIndex() throws Exception { GetResponse getResponse = client.prepareGet("student", "doc", "1").get(); System.out.println(getResponse.getSourceAsString()); }
/** * 建立索引,JSON * * @throws IOException */ public static void CreateJsonIndex() throws IOException { JSONObject json = new JSONObject(); json.put("user", "小明"); json.put("title", "Java Engineer"); json.put("desc", "web 开发"); IndexResponse response = client.prepareIndex("studentjson", "doc", "1") .setSource(json, XContentType.JSON) .get(); String _index = response.getIndex(); System.out.println(_index); }
/** * elasticsearch批量导入 */ public static void BulkCreateIndex() { BulkRequestBuilder builder = client.prepareBulk(); for (int i = 0; i < 100000; i++) { HashMap<String, Object> map = new HashMap<>(); map.put("recordtime", "11"); map.put("area", "22"); map.put("usertype", "33"); map.put("count", 44); builder.add(client.prepareIndex("bulktest", "1").setSource(map)); //每10000条提交一次 if (i % 10000 == 0) { builder.execute().actionGet(); builder = client.prepareBulk(); } } }
/** * 批量导出 */ public static void OutData() throws IOException { SearchResponse response = client.prepareSearch("bulktest").setTypes("1") .setQuery(QueryBuilders.matchAllQuery()) .setSize(10000).setScroll(new TimeValue(600000)) .setSearchType(SearchType.DEFAULT).execute().actionGet(); // setScroll(new TimeValue(600000)) 设置滚动的时间 String scrollid = response.getScrollId(); //把导出的结果以JSON的格式写到文件里 //每次返回数据10000条。一直循环查询知道全部的数据都被查询出来 while (true) { SearchResponse response2 = client.prepareSearchScroll(scrollid).setScroll(new TimeValue(1000000)) .execute().actionGet(); SearchHits searchHit = response2.getHits(); //再次查询不到数据时跳出循环 if (searchHit.getHits().length == 0) { break; } System.out.println("查询数量 :" + searchHit.getHits().length); for (int i = 0; i < searchHit.getHits().length; i++) { String json = searchHit.getHits()[i].getSourceAsString(); putData(json); } System.out.println("查询结束"); } }
public static void putData(String json) throws IOException { String str = json + "\n"; //写入本地文件 String fileTxt = "D:\\data.txt"; File file = new File(fileTxt); if (!file.getParentFile().exists()) { file.getParentFile().mkdirs(); } if (!file.exists()) { file.createNewFile(); FileWriter fw = new FileWriter(file, true); BufferedWriter bw = new BufferedWriter(fw); System.out.println("写入完成啦啊"); bw.write(String.valueOf(str)); bw.flush(); bw.close(); fw.close(); } else { FileWriter fw = new FileWriter(file, true); BufferedWriter bw = new BufferedWriter(fw); System.out.println("追加写入完成啦啦"); bw.write(String.valueOf(str)); bw.flush(); bw.close(); fw.close(); } } /** * 建立索引,并给某些字段指定ik分词器,之后向该索引中查询时,就会用ik分词 */ public static void CreateIndexIkTest() throws Exception { //建立映射 XContentBuilder mapping = XContentFactory.jsonBuilder() .startObject() .startObject("properties") //title:字段名, type:文本类型 analyzer :分词器类型 .startObject("title").field("type", "text").field("analyzer", "ik_smart").endObject() //该字段添加的内容,查询时将会使用ik_smart分词 .startObject("content").field("type", "text").field("analyzer", "ik_max_word").endObject() .endObject() .endObject(); //index:索引名 type:类型名(能够本身定义) PutMappingRequest putmap = Requests.putMappingRequest("index").type("type").source(mapping); //建立索引 client.admin().indices().prepareCreate("index").execute().actionGet(); //为索引添加映射 client.admin().indices().putMapping(putmap).actionGet(); //调用下面的方法为建立的索引添加内容 CreateIndex1(); } //这个方法是为上一步建立的索引中添加内容,包括id,id不能重复 public static void CreateIndex1() throws IOException { IndexResponse response = client.prepareIndex("index", "type", "1") //索引,类型,id .setSource(jsonBuilder() .startObject() .field("title", "title") //字段,值 .field("content", "content") .endObject() ).get(); }
/** * 更新索引 */ //更新索引,更新刚才建立的索引,若是id相同将会覆盖掉刚才的内容 public static void UpdateIndex() throws Exception { //每次添加id应该不一样,至关于数据表中的主键,相同的话将会进行覆盖 UpdateResponse response=client.update(new UpdateRequest("index","type","1") .doc(XContentFactory.jsonBuilder() .startObject() .field("title","中华人民共和国国歌,国歌是最好听的歌") .field("content","中华人民共和国国歌,国歌是最好听的歌") .endObject() )).get(); } //再插入一条数据 public static void createIndex2() throws IOException { IndexResponse response = client.prepareIndex("index", "type", "2") .setSource(jsonBuilder() .startObject() .field("title", "中华民族是伟大的民族") .field("content", "中华民族是伟大的民族") .endObject() ).get(); } /** * 下面使用index索引下的2个document进行查询 */ public static void Search(){ SearchResponse response1 = client.prepareSearch( "index") //指定多个索引 .setTypes("type") //指定类型 .setSearchType(SearchType.QUERY_THEN_FETCH) .setQuery(QueryBuilders.matchQuery("title", "中华人民共和国国歌")) // Query // .setPostFilter(QueryBuilders.rangeQuery("age").from(12).to(18)) // Filter .setFrom(0).setSize(60).setExplain(true) .get(); long totalHits1= response1.getHits().totalHits; //命中个数 System.out.println("response1======="+totalHits1); SearchResponse response2 = client.prepareSearch( "index") //指定多个索引 .setTypes("type") //指定类型 .setSearchType(SearchType.QUERY_THEN_FETCH) .setQuery(QueryBuilders.matchQuery("content", "中华人民共和国国歌")) // Query // .setPostFilter(QueryBuilders.rangeQuery("age").from(12).to(18)) // Filter .setFrom(0).setSize(60).setExplain(true) .get(); long totalHits2 = response2.getHits().totalHits; //命中个数 System.out.println("response2========="+totalHits2); } /** * GET操做 */ public static void get() { GetResponse response = client.prepareGet("index", "type", "2").get(); Map<String, Object> source = response.getSource(); Set<String> strings = source.keySet(); Iterator<String> iterator = strings.iterator(); while (iterator.hasNext()) { System.out.println(source.get(iterator.next())); } } }