1.首先将es中全部的操做封装成为一个EsSearchManager,而且使用单例模式,提供一个实例外部调用。git
EsSearchManager esSearchManager = EsSearchManager.getInstance();github
getInstance()的返回值是这个类的实例,构造函数中也是对client的建立数据库
public static EsSearchManager getInstance(){ if(null == esSearchManager ){ synchronized (EsSearchManager.class){ esSearchManager = new EsSearchManager(); } } return esSearchManager; } private EsSearchManager(){ getClient(); } private Client getClient() { try{ if(client==null){ init(); } }catch (Exception e){ LOG.error(e.getMessage()); } return client; } private void init() throws Exception { client = EsClient.getClient(); }
2.获得了client就能够建立索引,存入数据等一系列的索引数据操做了json
根据索引和类型建立索引。app
/** * 根据索引和类型建立索引 * @param indexName * @param type * @return * @throws Exception */ public Boolean buildIndex(String indexName,String type) throws Exception { IndicesExistsResponse response = getClient().admin().indices() .prepareExists(indexName,type).execute().actionGet(); Boolean flag = true; ResourceBundle rb = ResourceBundle.getBundle("commons"); String replicas = rb.getString("replicas"); String shards = rb.getString("shards"); String refreshInterval = rb.getString("refreshInterval"); if (!response.isExists()) { //须要将配置放置到配置文件中 Settings settings = Settings.settingsBuilder() .put("number_of_replicas", Integer.parseInt(replicas)) .put("number_of_shards", Integer.parseInt(shards)) .put("index.translog.flush_threshold_ops", 10000000) .put("refresh_interval", refreshInterval) .put("index.codec", "best_compression").build(); CreateIndexResponse createIndxeResponse = getClient().admin().indices() .prepareCreate(indexName).setSettings(settings).addMapping(type).execute() .actionGet(); flag = createIndxeResponse.isAcknowledged(); LOG.info("返回值" + flag); } return flag; }
/** * 建立单条索引 * @param indexName * @param type * @param json * @throws Exception */ public void buildDocument(String indexName, String type, String json) throws Exception { getClient().prepareIndex(indexName, type).setSource(json).execute() .actionGet(); } /** * 构造list集合索引数据 * @param indexName * @param type * @param list */ public void buildList2Documents(String indexName, String type, List<Map<String,Object>> list) throws Exception{ BulkRequestBuilder bulkRequest = getClient().prepareBulk(); for(Map<String,Object> map : list){ bulkRequest.add(getClient().prepareIndex(indexName, type) .setSource(this.generateJson(map))); } BulkResponse bulkIndexResponse = bulkRequest.execute().actionGet(); if (bulkIndexResponse.hasFailures()) { LOG.error(bulkIndexResponse.buildFailureMessage()); } }
如何判断索引是否存在,删除索引,下面就是判断和删除索引的方法函数
/** * 根据索引名称判断索引是否存在 * @param indexName * @return * @throws NumberFormatException * @throws UnknownHostException */ public Boolean existsIndex(String indexName) throws Exception { IndicesExistsResponse response = getClient().admin().indices() .prepareExists(indexName).execute().actionGet(); return response.isExists(); } /** * 根据索引名称删除索引 * @param indexName * @return * @throws NumberFormatException * @throws UnknownHostException */ public Boolean deleteIndex(String indexName) throws Exception { boolean flag = true; IndicesExistsResponse response = getClient().admin().indices() .prepareExists(indexName).execute().actionGet(); if (response.isExists()) { DeleteIndexResponse response2 = getClient().admin().indices() .prepareDelete(indexName).execute().actionGet(); flag = response2.isAcknowledged(); } return flag; }
根据docId删除某一条索引记录oop
** * 根据索引Id删除文档 * @param indexName * @param type * @param docId * @throws NumberFormatException * @throws UnknownHostException */ public void deleteDocument(String indexName, String type, String docId) throws Exception { getClient().prepareDelete(indexName, type, docId).execute().actionGet(); }
3.上面给出了一些索引的基本操做,下面来说讲es中如何根据索引去查询数据呢大数据
``` /** 下面这个方法就是构建全文检索,根据关键词全文检索 * 全文检索查询,多条件类型查询 */ public PageEntity<JSONObject> queryFulltext(List<String> keywords, List<String> indexs, List<String> types, List<String> fieldNames, List<String> allColumns, int pagenum, int pagesize) throws Exception { BoolQueryBuilder qb = buildFullText(keywords, types, fieldNames); if (qb == null) { LOG.info("queryFull Text == null"); return null; } LOG.info("Fulltext begin"); long begin = System.currentTimeMillis(); PageEntity<JSONObject> result = execute(qb, fieldNames,allColumns, indexs, types, pagenum, pagesize); long end = System.currentTimeMillis(); LOG.info("query Fulltext end cost:[{}]ms", end - begin); return result; } /** * 构造多字段 全文搜索查询条件 * [@param]keywords * [@param types * [@param] fieldNames * [@return * @throws Exception */ private BoolQueryBuilder buildFullText(List<String> keywords, List<String> types, List<String> fieldNames) throws Exception { BoolQueryBuilder qb = null; if (keywords != null && keywords.size() > 0 && fieldNames != null && fieldNames.size() > 0 && types != null && types.size() > 0) { qb = QueryBuilders.boolQuery(); for (String keyword : keywords) { QueryBuilder mustCondition = QueryBuilders.multiMatchQuery( keyword, fieldNames.toArray(new String[0])); qb.must(mustCondition); } } return qb; }
上面的方法实现的全文检索,只要某个字段或者某几个字段实现了分词,就能够实现相似数据库中的模糊匹配查询,下面介绍下term查询,只是针对某些特殊字段彻底匹配才可以查询到,这些字段每每都是不须要分词的。
/** * term 查询(在查询的时候不分词,主要针对 人名 地名等特殊的词语) * [[@param](https://my.oschina.net/u/2303379)] keywords * [@param) types * [[@param](https://my.oschina.net/u/2303379)] fieldnames * [[@param](https://my.oschina.net/u/2303379)] pagenum * [[@param](https://my.oschina.net/u/2303379)] pagesize * @throws Exception */ public PageEntity<JSONObject> queryWithTerm(List<String> keywords, List<String> indexs, List<String> types, List<String> fieldnames, List<String> dateFieldnames, List<String> allColumns, Long startTime, Long endTime, int pagenum, int pagesize) throws Exception { BoolQueryBuilder qb = buildTermQuery(keywords, types, fieldnames, dateFieldnames, startTime, endTime); if (qb == null) { LOG.info("queryTerm() QueryBuilder == null"); return new PageEntity<JSONObject>(); } LOG.info("query begin"); long begin = System.currentTimeMillis(); PageEntity<JSONObject> result = execute(qb, fieldnames, allColumns, indexs, types, pagenum, pagesize); long end = System.currentTimeMillis(); LOG.info("query end cost:[{}]ms", end - begin); return result; } /** * es query 实现es分页查询 * @param qb * @param fieldnames * @param allColumns * @param indexs * @param types * @param pagenum * @param pagesize * @return * @throws Exception */ private PageEntity<JSONObject> execute(QueryBuilder qb, List<String> fieldnames, List<String> allColumns, List<String> indexs, List<String> types, int pagenum, int pagesize) throws Exception { PageEntity<JSONObject> page = new PageEntity<JSONObject>(); String[] typeArry = types.toArray(new String[0]); String[] indexArry = indexs.toArray(new String[0]); int startnum = (pagenum - 1) * pagesize; SearchResponse response = null; try { response = getClient().prepareSearch(indexArry).setTypes(typeArry) .setQuery(qb).setFrom(startnum).setSize(pagesize) .execute().actionGet(); } catch (Exception e) { LOG.error("query error", e); throw e; } if (response == null) { return page; } SearchHits hits = response.getHits(); LOG.info("execute hit:" + hits.totalHits()); List<JSONObject> resultString = new ArrayList<JSONObject>(); if (null != hits && hits.totalHits() > 0) { for (SearchHit hit : hits) { JSONObject obj = new JSONObject(); if (allColumns != null && allColumns.size() > 0) { fieldnames = allColumns; } for (String str : fieldnames) { obj.put(str, hit.getSource().get(str)); } resultString.add(obj); } } page.setContents(resultString); page.setPageSize(pagesize); page.setCurrentPageNo(pagenum); page.setTotalCount(hits.totalHits()); return page; }
4.下面的例子代码演示如何调用上述代码操做es的ui
建立testIndex中的testType类型的索引this
EsSearchManager esSearchManager = EsSearchManager.getInstance();
esSearchManager.buildIndex("testIndex","testType");
//索引数据
List<Map<String,Object>> list = new ArrayList<>();
Map<String,Object> map = new HashMap<>();
map.put("fieldA",100);
map.put("fieldB",22);
map.put("fieldC","hoge");
map.put("fieldD","huga");
list.add(map);
esSearchManager.buildList2Documents("testindex","testtypes",list);
//传入某个字段进行不分词精确查找
List<String> keywords = new ArrayList<>();
keywords.add("huga");
List<String> types = new ArrayList<>();
types.add("testtypes");
List<String> indexs = new ArrayList<>();
indexs.add("testindex");
List<String> fieldNames = new ArrayList<>();
fieldNames.add("fieldD");
PageEntity<JSONObject> pg = esSearchManager.queryWithTerm(keywords,indexs,
types,fieldNames,null,null,null,null,1,10);
//传入某个字段进行全文检索
List<String> keywords = new ArrayList<>();
keywords.add("huga");
List<String> types = new ArrayList<>();
types.add("testtypes");
List<String> indexs = new ArrayList<>();
indexs.add("testindex");
List<String> fieldNames = new ArrayList<>();
fieldNames.add("fieldD");
PageEntity<JSONObject> pg1 = esSearchManager.queryFulltext(keywords,
indexs, types, fieldNames,
null, 1,10);
以上总结了部分es基本操做API和调用demo,详细的代码请查看github地址 https://github.com/winstonelei/BigDataTools ,包括了一些大数据组件的基本操做,包含了hbase,hadoop,es,hive等