1、简介html
ElasticSearch和Solr都是基于Lucene的搜索引擎,不过ElasticSearch天生支持分布式,而Solr是4.0版本后的SolrCloud才是分布式版本,Solr的分布式支持须要ZooKeeper的支持。java
这里有一个详细的ElasticSearch和Solr的对比:http://solr-vs-elasticsearch.com/node
2、基本用法git
集群(Cluster): ES是一个分布式的搜索引擎,通常由多台物理机组成。这些物理机,经过配置一个相同的cluster name,互相发现,把本身组织成一个集群。github
节点(Node):同一个集群中的一个Elasticsearch主机。sql
Node类型:数据库
1)data node: 存储index数据。Data nodes hold data and perform data related operations such as CRUD, search, and aggregations.json
2)client node: 不存储index,处理转发客户端请求到Data Node。api
3)master node: 不存储index,集群管理,如管理路由信息(routing infomation),判断node是否available,当有node出现或消失时重定位分片(shards),当有node failure时协调恢复。(全部的master node会选举出一个master leader node)缓存
详情参考:https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-node.html
主分片(Primary shard):索引(下文介绍)的一个物理子集。同一个索引在物理上能够切多个分片,分布到不一样的节点上。分片的实现是Lucene 中的索引。
注意:ES中一个索引的分片个数是创建索引时就要指定的,创建后不可再改变。因此开始建一个索引时,就要预计数据规模,将分片的个数分配在一个合理的范围。
副本分片(Replica shard):每一个主分片能够有一个或者多个副本,个数是用户本身配置的。ES会尽可能将同一索引的不一样分片分布到不一样的节点上,提升容错性。对一个索引,只要不是全部shards所在的机器都挂了,就还能用。
索引(Index):逻辑概念,一个可检索的文档对象的集合。相似与DB中的database概念。同一个集群中可创建多个索引。好比,生产环境常见的一种方法,对每月产生的数据建索引,以保证单个索引的量级可控。
类型(Type):索引的下一级概念,大概至关于数据库中的table。同一个索引里能够包含多个 Type。
文档(Document):即搜索引擎中的文档概念,也是ES中一个能够被检索的基本单位,至关于数据库中的row,一条记录。
字段(Field):至关于数据库中的column。ES中,每一个文档,实际上是以json形式存储的。而一个文档能够被视为多个字段的集合。好比一篇文章,可能包括了主题、摘要、正文、做者、时间等信息,每一个信息都是一个字段,最后被整合成一个json串,落地到磁盘。
映射(Mapping):至关于数据库中的schema,用来约束字段的类型,不过 Elasticsearch 的 mapping 能够不显示地指定、自动根据文档数据建立。
Elasticsearch集群能够包含多个索引(indices),每个索引能够包含多个类型(types),每个类型包含多个文档(documents),而后每一个文档包含多个字段(Fields),这种面向文档型的储存,也算是NoSQL的一种吧。
ES比传统关系型数据库,对一些概念上的理解:
Relational DB -> Databases -> Tables -> Rows -> Columns Elasticsearch -> Indices -> Types -> Documents -> Fields
从建立一个Client到添加、删除、查询等基本用法:
一、建立Client
public ElasticSearchService(String ipAddress, int port) { client = new TransportClient() .addTransportAddress(new InetSocketTransportAddress(ipAddress, port)); }
这里是一个TransportClient。
ES下两种客户端对比:
TransportClient:轻量级的Client,使用Netty线程池,Socket链接到ES集群。自己不加入到集群,只做为请求的处理。
Node Client:客户端节点自己也是ES节点,加入到集群,和其余ElasticSearch节点同样。频繁的开启和关闭这类Node Clients会在集群中产生“噪音”。
二、建立/删除Index和Type信息
// 建立索引 public void createIndex() { client.admin().indices().create(new CreateIndexRequest(IndexName)) .actionGet(); } // 清除全部索引 public void deleteIndex() { IndicesExistsResponse indicesExistsResponse = client.admin().indices() .exists(new IndicesExistsRequest(new String[] { IndexName })) .actionGet(); if (indicesExistsResponse.isExists()) { client.admin().indices().delete(new DeleteIndexRequest(IndexName)) .actionGet(); } } // 删除Index下的某个Type public void deleteType(){ client.prepareDelete().setIndex(IndexName).setType(TypeName).execute().actionGet(); } // 定义索引的映射类型 public void defineIndexTypeMapping() { try { XContentBuilder mapBuilder = XContentFactory.jsonBuilder(); mapBuilder.startObject() .startObject(TypeName) .startObject("_all").field("enabled", false).endObject() .startObject("properties") .startObject(IDFieldName).field("type", "long").endObject() .startObject(SeqNumFieldName).field("type", "long").endObject() .startObject(IMSIFieldName).field("type", "string").field("index", "not_analyzed").endObject() .startObject(IMEIFieldName).field("type", "string").field("index", "not_analyzed").endObject() .startObject(DeviceIDFieldName).field("type", "string").field("index", "not_analyzed").endObject() .startObject(OwnAreaFieldName).field("type", "string").field("index", "not_analyzed").endObject() .startObject(TeleOperFieldName).field("type", "string").field("index", "not_analyzed").endObject() .startObject(TimeFieldName).field("type", "date").field("store", "yes").endObject() .endObject() .endObject() .endObject(); PutMappingRequest putMappingRequest = Requests .putMappingRequest(IndexName).type(TypeName) .source(mapBuilder); client.admin().indices().putMapping(putMappingRequest).actionGet(); } catch (IOException e) { log.error(e.toString()); }
这里自定义了某个Type的索引映射(Mapping):
1)默认ES会自动处理数据类型的映射:针对整型映射为long,浮点数为double,字符串映射为string,时间为date,true或false为boolean。
2)字段的默认配置是indexed,但不是stored的,也就是 field("index", "yes").field("store", "no")。
3)这里Disable了“_all”字段,_all字段会把全部的字段用空格链接,而后用“analyzed”的方式index这个字段,这个字段能够被search,可是不能被retrieve。
4)针对string,ES默认会作“analyzed”处理,即先作分词、去掉stop words等处理再index。若是你须要把一个字符串作为总体被索引到,须要把这个字段这样设置:field("index", "not_analyzed")。
5)默认_source字段是enabled,_source字段存储了原始Json字符串(original JSON document body that was passed at index time)。
详情参考:
https://www.elastic.co/guide/en/elasticsearch/guide/current/mapping-intro.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-store.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-all-field.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-source-field.html
三、索引数据
// 批量索引数据 public void indexHotSpotDataList(List<Hotspotdata> dataList) { if (dataList != null) { int size = dataList.size(); if (size > 0) { BulkRequestBuilder bulkRequest = client.prepareBulk(); for (int i = 0; i < size; ++i) { Hotspotdata data = dataList.get(i); String jsonSource = getIndexDataFromHotspotData(data); if (jsonSource != null) { bulkRequest.add(client .prepareIndex(IndexName, TypeName, data.getId().toString()) .setRefresh(true).setSource(jsonSource)); } } BulkResponse bulkResponse = bulkRequest.execute().actionGet(); if (bulkResponse.hasFailures()) { Iterator<BulkItemResponse> iter = bulkResponse.iterator(); while (iter.hasNext()) { BulkItemResponse itemResponse = iter.next(); if (itemResponse.isFailed()) { log.error(itemResponse.getFailureMessage()); } } } } } } // 索引数据 public boolean indexHotspotData(Hotspotdata data) { String jsonSource = getIndexDataFromHotspotData(data); if (jsonSource != null) { IndexRequestBuilder requestBuilder = client.prepareIndex(IndexName, TypeName).setRefresh(true); requestBuilder.setSource(jsonSource) .execute().actionGet(); return true; } return false; } // 获得索引字符串 public String getIndexDataFromHotspotData(Hotspotdata data) { String jsonString = null; if (data != null) { try { XContentBuilder jsonBuilder = XContentFactory.jsonBuilder(); jsonBuilder.startObject().field(IDFieldName, data.getId()) .field(SeqNumFieldName, data.getSeqNum()) .field(IMSIFieldName, data.getImsi()) .field(IMEIFieldName, data.getImei()) .field(DeviceIDFieldName, data.getDeviceID()) .field(OwnAreaFieldName, data.getOwnArea()) .field(TeleOperFieldName, data.getTeleOper()) .field(TimeFieldName, data.getCollectTime()) .endObject(); jsonString = jsonBuilder.string(); } catch (IOException e) { log.equals(e); } } return jsonString; }
ES支持批量和单个数据索引。
四、查询获取数据
// 获取少许数据100个 private List<Integer> getSearchData(QueryBuilder queryBuilder) { List<Integer> ids = new ArrayList<>(); SearchResponse searchResponse = client.prepareSearch(IndexName) .setTypes(TypeName).setQuery(queryBuilder).setSize(100) .execute().actionGet(); SearchHits searchHits = searchResponse.getHits(); for (SearchHit searchHit : searchHits) { Integer id = (Integer) searchHit.getSource().get("id"); ids.add(id); } return ids; } // 获取大量数据 private List<Integer> getSearchDataByScrolls(QueryBuilder queryBuilder) { List<Integer> ids = new ArrayList<>(); // 一次获取100000数据 SearchResponse scrollResp = client.prepareSearch(IndexName) .setSearchType(SearchType.SCAN).setScroll(new TimeValue(60000)) .setQuery(queryBuilder).setSize(100000).execute().actionGet(); while (true) { for (SearchHit searchHit : scrollResp.getHits().getHits()) { Integer id = (Integer) searchHit.getSource().get(IDFieldName); ids.add(id); } scrollResp = client.prepareSearchScroll(scrollResp.getScrollId()) .setScroll(new TimeValue(600000)).execute().actionGet(); if (scrollResp.getHits().getHits().length == 0) { break; } } return ids; }
这里的QueryBuilder是一个查询条件,ES支持分页查询获取数据,也能够一次性获取大量数据,须要使用Scroll Search。
五、聚合(Aggregation Facet)查询
// 获得某段时间内设备列表上每一个设备的数据分布状况<设备ID,数量> public Map<String, String> getDeviceDistributedInfo(String startTime, String endTime, List<String> deviceList) { Map<String, String> resultsMap = new HashMap<>(); QueryBuilder deviceQueryBuilder = getDeviceQueryBuilder(deviceList); QueryBuilder rangeBuilder = getDateRangeQueryBuilder(startTime, endTime); QueryBuilder queryBuilder = QueryBuilders.boolQuery() .must(deviceQueryBuilder).must(rangeBuilder); TermsBuilder termsBuilder = AggregationBuilders.terms("DeviceIDAgg").size(Integer.MAX_VALUE) .field(DeviceIDFieldName); SearchResponse searchResponse = client.prepareSearch(IndexName) .setQuery(queryBuilder).addAggregation(termsBuilder) .execute().actionGet(); Terms terms = searchResponse.getAggregations().get("DeviceIDAgg"); if (terms != null) { for (Terms.Bucket entry : terms.getBuckets()) { resultsMap.put(entry.getKey(), String.valueOf(entry.getDocCount())); } } return resultsMap; }
Aggregation查询能够查询相似统计分析这样的功能:如某个月的数据分布状况,某类数据的最大、最小、总和、平均值等。
详情参考:https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/java-aggs.html
3、集群配置
配置文件elasticsearch.yml
集群名和节点名:
#cluster.name: elasticsearch
#node.name: "Franz Kafka"
是否参与master选举和是否存储数据
#node.master: true
#node.data: true
分片数和副本数
#index.number_of_shards: 5
#index.number_of_replicas: 1
容许其余网络访问:
network.host: 0
master选举最少的节点数,这个必定要设置为整个集群节点个数的一半加1,即N/2+1
#discovery.zen.minimum_master_nodes: 1
discovery ping的超时时间,拥塞网络,网络状态不佳的状况下设置高一点
#discovery.zen.ping.timeout: 3s
注意,分布式系统整个集群节点个数N要为奇数个!!
如何避免ElasticSearch发生脑裂(brain split):http://blog.trifork.com/2013/10/24/how-to-avoid-the-split-brain-problem-in-elasticsearch/
即便集群节点个数为奇数,minimum_master_nodes为整个集群节点个数一半加1,也难以免脑裂的发生,详情看讨论:https://github.com/elastic/elasticsearch/issues/2488
4、经常使用查询
curl -X<REST Verb> <Node>:<Port>/<Index>/<Type>/<ID>
Index info:
curl -XGET 'localhost:9200'
curl -XGET 'localhost:9200/_stats?pretty'
curl -XGET 'localhost:9200/{index}/_stats?pretty'
curl -XGET 'localhost:9200/_cluster/health?level=indices&pretty=true'
curl -XGET 'localhost:9200/{index}?pretty'
curl -XGET 'localhost:9200/_cat/indices?v'
curl -XGET 'localhost:9200/{index}/_mapping/{type}?pretty'
Mapping info:
curl -XGET 'localhost:9200/subscriber/_mapping/subscriber?pretty'
Index search:
curl -XGET 'localhost:9200/subscriber/subscriber/_search?pretty'
Search by ID:
curl -XGET 'localhost:9200/subscriber/subscriber/5000?pretty'
Search by field:
curl -XGET 'localhost:9200/subscriber/subscriber/_search?q=ipAddress:63.141.15.45&&pretty'
Delete index:
curl -XDELETE 'localhost:9200/subscriber?pretty'
Delete document by ID:
curl -XDELETE 'localhost:9200/subscriber/subscriber/5000?pretty'
Delete document by query:
curl -XDELETE 'localhost:9200/subscriber/subscriber/_query?q=ipAddress:63.141.15.45&&pretty'
5、基本原理
一、ES写数据原理
每一个doc,经过以下公式决定写到哪一个分片上:
shard= hash(routing) % number_of_primary_shards
Routing 是一个可变值,默认是文档的 _id ,也能够自定义一个routing规则。
默认状况下,primary shard在写操做前,须要肯定大多数(a quorum, or majority)的shard copies是可用的。这样是为了防止在有网络分区(network partition)的状况下把数据写到了错误的分区。
A quorum是由如下公式决定:
int( (primary + number_of_replicas) / 2 ) + 1,number_of_replicas是在index settings中指定的复制个数。
肯定一致性的值有:one (只有primary shard),all (the primary and all replicas),或者是默认的quorum。
若是没有足够可用的shard copies,elasticsearch会等待直到超时,默认等待一分钟。
二、ES读数据原理
Elasticsearch中的查询主要分为两类,Get请求:经过ID查询特定Doc;Search请求:经过Query查询匹配Doc。
全部的搜索系统通常都是两阶段查询,第一阶段查询到匹配的DocID,第二阶段再查询DocID对应的完整文档,这种在Elasticsearch中称为query_then_fetch,还有一种是一阶段查询的时候就返回完整Doc,在Elasticsearch中称做query_and_fetch,通常第二种适用于只须要查询一个Shard的请求。
除了一阶段,两阶段外,还有一种三阶段查询的状况。搜索里面有一种算分逻辑是根据TF(Term Frequency)和DF(Document Frequency)计算基础分,可是Elasticsearch中查询的时候,是在每一个Shard中独立查询的,每一个Shard中的TF和DF也是独立的,虽然在写入的时候经过_routing保证Doc分布均匀,可是无法保证TF和DF均匀,那么就有会致使局部的TF和DF不许的状况出现,这个时候基于TF、DF的算分就不许。为了解决这个问题,Elasticsearch中引入了DFS查询,好比DFS_query_then_fetch,会先收集全部Shard中的TF和DF值,而后将这些值带入请求中,再次执行query_then_fetch,这样算分的时候TF和DF就是准确的,相似的有DFS_query_and_fetch。这种查询的优点是算分更加精准,可是效率会变差。另外一种选择是用BM25代替TF/DF模型。
在新版本Elasticsearch中,用户无法指定DFS_query_and_fetch和query_and_fetch,这两种只能被Elasticsearch系统改写。
6、Elasticsearch插件
一、elasticsearch-head是一个elasticsearch的集群管理工具:./elasticsearch-1.7.1/bin/plugin -install mobz/elasticsearch-head
github地址:https://github.com/mobz/elasticsearch-head
二、elasticsearch-sql:使用SQL语法查询elasticsearch:./bin/plugin -u https://github.com/NLPchina/elasticsearch-sql/releases/download/1.3.5/elasticsearch-sql-1.3.5.zip --install sql
github地址:https://github.com/NLPchina/elasticsearch-sql
三、elasticsearch-bigdesk是elasticsearch的一个集群监控工具,能够经过它来查看ES集群的各类状态。
安装:./bin/plugin -install lukas-vlcek/bigdesk
访问:http://192.103.101.203:9200/_plugin/bigdesk/
github地址:https://github.com/hlstudio/bigdesk
四、elasticsearch-servicewrapper插件是ElasticSearch的服务化插件
https://github.com/elasticsearch/elasticsearch-servicewrapper
DEPRECATED: The service wrapper is deprecated and not maintained. 该项目已再也不维护。
例子代码在GitHub上:https://github.com/luxiaoxun/Code4Java
参考:
https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/index.html
https://www.elastic.co/guide/en/elasticsearch/reference/current/index.html
https://www.elastic.co/guide/en/elasticsearch/guide/current/distrib-write.html
http://stackoverflow.com/questions/10213009/solr-vs-elasticsearch