Elasticsearch高级API-通用方法封装

补充点

补充集群安装

在主节点elasticsearch.yml中配置html

cluster.name: ES_books
 node.name: master
 node.master: true
复制代码

子节点中配置java

cluster.name: ES_books                                 集群名称, 和主节点一致
 node.name: slave1
 discovery.zen.ping.unicast.hosts: ["sanq1.com.cn"]     找到主节点
复制代码

重启ES便可node

补充安装IK插件

安装IK分词器插件的版本须要和ES的版本一致,而且将IK插件放到文件夹pluginsgit

wget https://github.com/medcl/elasticsearch-analysis-ik/releases/download/v6.8.2/elasticsearch-analysis-ik-6.8.2.zip
复制代码

完整封装代码查看

X_Util项目下Config模块github

正式开始

在ES6.8.2 中ES官方推荐使用高级API来操做ES, 咱们来具体看下shell

声明:json

配置基于xml的方式app

连接ES

这里采用HttpHost方式来进行连接, 为了方便集群配置, 咱们作以下的操做elasticsearch

  1. 声明实体, 配置地址和端口
public class HostAndPort {
    private String host;
    private int port;       //这里的端口是http访问端口 默认9200

    public HostAndPort(String host, int port) {
        this.host = host;
        this.port = port;
    }
    
    //setting, getting
}
复制代码
  1. 正式配置工具
public class EsCluster implements FactoryBean<RestHighLevelClient>, InitializingBean, DisposableBean {

    //ES地址
    private Set<HostAndPort> sets;
    
    private RestHighLevelClient client;

    public EsCluster() {
    }

    public EsCluster(Set<HostAndPort> sets) {
        this.sets = sets;
    }

    public Set<HostAndPort> getSets() {
        return sets;
    }

    public void setSets(Set<HostAndPort> sets) {
        this.sets = sets;
    }


    @Override
    public void destroy() throws Exception {
        if (this.client != null)
            client.close();
    }

    @Override
    public RestHighLevelClient getObject() throws Exception {
        return this.client;
    }

    @Override
    public Class<?> getObjectType() {
        return EsCluster.class;
    }

    @Override
    public boolean isSingleton() {
        return true;
    }

    @Override
    public void afterPropertiesSet() throws Exception {
        initClient();
    }

    // 这里是实际的连接过程
    private void initClient() {
        List<HttpHost> httpHostList = new ArrayList<>(sets.size());

        sets.stream().forEach(s -> {
            httpHostList.add(new HttpHost(s.getHost(), s.getPort(), "http"));
        });
        client = new RestHighLevelClient(RestClient.builder(httpHostList.toArray(new HttpHost[httpHostList.size()])));
    }
}
复制代码
  1. 连接样例:
<bean id="restClient" class="com.sanq.product.utils.es.factory.EsCluster" destroy-method="destroy">
    <constructor-arg index="0">
        <set>
            <bean class="com.sanq.product.utils.es.config.HostAndPort">
                <constructor-arg index="0" value="192.168.87.134"/>
                <constructor-arg index="1" value="9200" type="int"/>
            </bean>
        </set>
    </constructor-arg>
</bean>
复制代码

方法说明

获取客户端ide

@Resource
private RestHighLevelClient restClient;
复制代码

验证索引是否建立

@Override
public boolean check(String index) throws Exception {
    GetIndexRequest getIndexRequest = new GetIndexRequest(index);
    return restClient.indices().exists(getIndexRequest, RequestOptions.DEFAULT);
}
复制代码

建立索引

/** * 建立索引 * * @param index 索引名 * @param mapping 文档信息 * @return */
@Override
public boolean createIndex(String index, String mapping) throws Exception {
    CreateIndexRequest createIndexRequest = new CreateIndexRequest(index)
            .settings(Settings.builder()
                    .put("index.number_of_shards", 3)       //分片数
                    .put("index.number_of_replicas", 2))    //备份数
            .mapping(mapping, XContentType.JSON);

    CreateIndexResponse createIndexResponse = restClient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
    return createIndexResponse.isAcknowledged() && createIndexResponse.isShardsAcknowledged();
}
复制代码

经过ID获取详情 关于泛型请查看完整代码

@Override
public T findById(String index, String type, String id) throws Exception {
    GetRequest getRequest = new GetRequest(index, type, id);
    GetResponse getResponse = restClient.get(getRequest, RequestOptions.DEFAULT);
    return JsonUtil.json2Obj(getResponse.getSourceAsString(), getGenericClass());
}
复制代码

保存数据到ES

//entity中必须包含id字段, 给ES指定ID
@Override
public String save(String index, String type, T entity) throws Exception {

    Map<String, Object> map = bean2Map(entity); //将entity转换成Map

    IndexRequest indexRequest = new IndexRequest(index, type, map.get("id").toString()).source(map).opType(DocWriteRequest.OpType.CREATE);
    IndexResponse indexResponse = restClient.index(indexRequest, RequestOptions.DEFAULT);

    return indexResponse.getId();
}
复制代码

批量保存

@Override
public boolean saveList(String index, String type, List<T> entityList) throws Exception {
    BulkRequest bulkRequest = new BulkRequest();

    entityList.stream().forEach(entity -> {
        try {
            Map<String, Object> map = bean2Map(entity);

            String id = String.valueOf(map.get("id"));

            bulkRequest.add(new IndexRequest(index, type, id).source(JsonUtil.obj2Json(map), XContentType.JSON));
        } catch (Exception e) {
            e.printStackTrace();
        }
    });

    BulkResponse bulk = restClient.bulk(bulkRequest, RequestOptions.DEFAULT);
    return bulk.status().getStatus() == RestStatus.OK.getStatus();
}
复制代码

修改数据

@Override
public boolean update(String index, String type, T entity) throws Exception {
    Map<String, Object> map = bean2Map(entity);

    UpdateRequest request = new UpdateRequest(index, type, map.get("id").toString());
    request.doc(map);

    UpdateResponse update = restClient.update(request, RequestOptions.DEFAULT);
    return update.status().getStatus() == RestStatus.OK.getStatus();
}
复制代码

删除

@Override
public boolean delete(String index, String type, String id) throws Exception {
    DeleteRequest deleteRequest = new DeleteRequest(index, type, id);

    DeleteResponse delete = restClient.delete(deleteRequest, RequestOptions.DEFAULT);
    return delete.status().getStatus() == RestStatus.OK.getStatus();
}
复制代码

批量删除

@Override
public boolean deleteList(String index, String type, List<String> ids) throws Exception {
    BulkRequest bulkRequest = new BulkRequest();

    ids.stream().forEach(id -> {
        bulkRequest.add(new DeleteRequest(index, type, id));
    });

    BulkResponse bulk = restClient.bulk(bulkRequest, RequestOptions.DEFAULT);
    return bulk.status().getStatus() == RestStatus.OK.getStatus();
}
复制代码

删除索引

@Override
public boolean deleteIndex(String index) throws Exception {
    AcknowledgedResponse delete = restClient.indices().delete(new DeleteIndexRequest(index), RequestOptions.DEFAULT);

    return delete.isAcknowledged();
}
复制代码

查询数据

@Override
public SearchPager<T> findListByPager(String index, String type, T entity, SearchPagination pagination //默认展现条数, scrollId ) throws Exception {
    if (!StringUtil.isEmpty(pagination.getScrollId())) {
        return getScrollPager(pagination);
    }

    Map<String, Object> map = bean2Map(entity);

    SearchRequest searchRequest = new SearchRequest(index).types(type);

    //构造搜索条件
    SearchSourceBuilder sourceBuilder = getSearchRequest(map);

    //查询条数
    sourceBuilder.size(pagination.getPageSize());
    //排序 能够提取出来指定排序
    sourceBuilder.sort("id", SortOrder.ASC);

    searchRequest.source(sourceBuilder);
    
    //这里是指定scrollId的过时时间
    searchRequest.scroll(TimeValue.timeValueMillis(5L));

    SearchResponse searchResponse = restClient.search(searchRequest, RequestOptions.DEFAULT);
    pagination.setScrollId(searchResponse.getScrollId());

    List<T> data = getListData(searchResponse.getHits());

    return new SearchPager<T>(pagination, data);
}

/**将查询出来的数据 封装成List*/
private List<T> getListData(SearchHits hits) {
    List<T> data = new ArrayList<>();
    for (SearchHit hit : hits.getHits()) {
        if (getGenericClass() == null) data.add((T) hit.getSourceAsMap());
        else data.add(JsonUtil.json2Obj(hit.getSourceAsString(), getGenericClass()));
    }
    return data;
}

/**经过scrollId获取分页数据*/
private SearchPager<T> getScrollPager(SearchPagination pagination) throws Exception {
    
    SearchScrollRequest scrollRequest = new SearchScrollRequest(pagination.getScrollId());
    scrollRequest.scroll(TimeValue.timeValueMillis(5L));
    SearchResponse searchScrollResponse = restClient.scroll(scrollRequest, RequestOptions.DEFAULT);
        
    // 生成了新的scrollId, 这里将旧的scrollId进行删除
    clearScrollId(pagination.getScrollId());

    //设置新的scrollId
    pagination.setScrollId(searchScrollResponse.getScrollId());

    List<T> data = getListData(searchScrollResponse.getHits());

    return new SearchPager<T>(pagination, data);
}

/** * 清除以前的scrollId * @param scrollId */
private void clearScrollId(String scrollId) throws Exception {
    ClearScrollRequest request = new ClearScrollRequest();
    request.addScrollId(scrollId);
    restClient.clearScroll(request, RequestOptions.DEFAULT);
}

private SearchSourceBuilder getSearchRequest(Map<String, Object> map) {
        SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();

    BoolQueryBuilder boolQueryBuilder = new BoolQueryBuilder();

    map.entrySet().stream().forEach(entry -> {
        Object value = entry.getValue();
        if (value instanceof Integer ||
                value instanceof Long ||
                value instanceof Float ||
                value instanceof Double ||
                value instanceof Boolean
        ) {
            boolQueryBuilder.must().add(QueryBuilders.termQuery(entry.getKey(), value));
        } else
            boolQueryBuilder.must().add(QueryBuilders.matchQuery(entry.getKey(), value));
    });

    sourceBuilder.query(boolQueryBuilder);

    return sourceBuilder;
}
复制代码

这里的分页方式只适用于上一页下一页, 不适合跳页。

聚合查询

关于聚合查询 尚未想到怎样作成通用的方式来使用,由于聚合查询能够嵌套的一层又一层 这里给出一个例子。

  1. 经过IP去重查询
{
    "size": 0,
    "aggs": {
       "ip_card": {
    		"cardinality": {
    			"field": "ip"
    		}
    	}
    }
}
复制代码
//实现
CardinalityAggregationBuilder cardinalityAggregationBuilder = AggregationBuilders.cardinality("ip_card").field("ip");

SearchResponse searchResponse = getSearchResponse(index, cardinalityAggregationBuilder);
if (searchResponse.status().getStatus() == RestStatus.OK.getStatus()) {
    StringUtil.toInteger(searchResponse.getHits().getTotalHits());  //总数
    StringUtil.toInteger(((ParsedCardinality) searchResponse.getAggregations().get("ip_card")).getValue());    //去重数
}

/**查询条件*/
private SearchResponse getSearchResponse(String index, AggregationBuilder aggregationBuilder) throws Exception {
    SearchRequest searchRequest = new SearchRequest(index).types(EsIndexs.Indexs._DOC);
    SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder().aggregation(aggregationBuilder);
    searchSourceBuilder.size(0);

    searchRequest.source(searchSourceBuilder);

    return super.getClient().search(searchRequest, RequestOptions.DEFAULT);
}
复制代码
  1. 根据省份,城市IP去重
{
	"size": 0,
	"aggs": {
		"prov_terms": {
			"terms": {
				"field": "provName"
			},
			"aggs": {
				"city_terms":{
					"terms": {
						"field": "cityName"
					},
					"aggs": {
						"ip_card": {
							"cardinality": {
								"field": "ip"
							}
						}
					}
				}
			}
		}
	}
}
复制代码
//实现
@Override
public List<AreaReportVo> getAreaReport(String index) {
    String ipCard = "ip_card",
            provTerms = "prov_terms",
            cityTerms = "city_terms";
    try {
        if (!super.check(index)) {
            this.createIndex();
        }

        TermsAggregationBuilder termsAggregationBuilder =
                AggregationBuilders.terms(provTerms).field("provName")
                        .subAggregation(
                                AggregationBuilders.terms(cityTerms).field("cityName")
                                        .subAggregation(
                                                AggregationBuilders.cardinality(ipCard).field("ip")
                                        )
                        );

        //和上面是同样的
        SearchResponse searchResponse = getSearchResponse(index, termsAggregationBuilder);

        List<AreaReportVo> areaReportVoList = null;
        if (searchResponse.status().getStatus() == RestStatus.OK.getStatus()) {
            List<? extends Terms.Bucket> buckets = ((ParsedStringTerms) searchResponse.getAggregations().get(provTerms)).getBuckets(); 

            AreaReportVo areaReportVo;
            for (int i = 0, size = buckets.size(); i < size; i++) {
                String provName = buckets.get(i).getKeyAsString();  //省份

                List<? extends Terms.Bucket> cityBuckets = ((ParsedStringTerms) buckets.get(i).getAggregations().get(cityTerms)).getBuckets();
                for (int j = 0, j_size = cityBuckets.size(); j < j_size; j++) {
                    areaReportVo = new AreaReportVo();

                    areaReportVo.setProvName(provName);
                    areaReportVo.setCityName(cityBuckets.get(j).getKeyAsString());          //城市
                    areaReportVo.setViews(StringUtil.toInteger(buckets.get(j).getDocCount()));
                    areaReportVo.setIpViews(StringUtil.toInteger(((ParsedCardinality) cityBuckets.get(j).getAggregations().get(ipCard)).getValue()));

                    areaReportVoList.add(areaReportVo);
                }
            }
        }
        return areaReportVoList;

    } catch (Exception e) {
        e.printStackTrace();
    }
    return Collections.emptyList();
}
复制代码

到这里就算是完成了通用方法的封装, 你们能够亲自体验下。

文档

  1. 官方文档
  2. 封装完整代码, 该项目持续更新
相关文章
相关标签/搜索