让Elasticsearch飞起来!百亿级实时查询优化实战

最近的一个项目是风控过程数据实时统计分析和聚合的一个 OLAP 分析监控平台,日流量峰值在 10 到 12 亿上下,每一年数据约 4000 亿条,占用空间大概 200T。html


image.png

什么是时序索引?其主要特色体如今以下两个方面:node

  • 存,以时间为轴,数据只有增长,没有变动,而且必须包含 timestamp(日期时间,名称随意)字段。android

    其做用和意义要大于数据的 id 字段,常见的数据好比咱们一般要记录的操做日志、用户行为日志、或股市行情数据、服务器 CPU、内存、网络的使用率等。算法

  • 取,必定是以时间范围为第一过滤条件,而后是其余查询条件,好比近一天、一周、本月等等,而后在这个范围内进行二次过滤。数据库

    好比性别或地域等,查询结果中比较关注的是每条数据和 timestamp 字段具体发生的时间点,而非 id。api


此类数据通常用于 OLAP、监控分析等场景。服务器


集群部署规划网络


image.png

咱们都知道在 Elasticsearch(下称 ES)集群中有两个主要角色:Master Node 和 Data Node,其余如 Tribe Node 等节点可根据业务须要另行设立。架构


为了让集群有更好的性能表现,咱们应该对这两个角色有一个更好的规划,在 Nodes 之间作读取分离,保证集群的稳定性和快速响应,在大规模的数据存储和查询的压力之下可以坦然面对,各自愉快的协做。并发


Master Nodes


Master Node,整个集群的管理者,负有对 index 的管理、shards 的分配,以及整个集群拓扑信息的管理等功能。


众所周知,Master Node 能够经过 Data Node 兼任,可是,若是对群集规模和稳定要求很高的话,就要职责分离,Master Node 推荐独立,它的状态关乎整个集群的存活。


Master 的配置:

node.master: true 
node.datafalse 
node.ingest: false


这样 Master 不参与 I、O,从数据的搜索和索引操做中解脱出来,专门负责集群的管理工做,所以 Master Node 的节点配置能够相对低一些。


另外防止 ES 集群 split brain(脑裂),合理配置 discovery.zen.minimum_master_nodes 参数,官方推荐 master-eligible nodes / 2 + 1 向下取整的个数。


这个参数决定选举 Master 的 Node 个数,过小容易发生“脑裂”,可能会出现多个 Master,太大 Master 将没法选举。


更多 Master 选举相关内容请参考:

https://www.elastic.co/guide/en/elasticsearch/reference/5.3/modules-discovery-zen.html#master-election


Data Nodes


Data Node 是数据的承载者,对索引的数据存储、查询、聚合等操做提供支持。


这些操做严重消耗系统的 CPU、内存、IO 等资源,所以,应该把最好的资源分配给 Data Node,由于它们是真正干累活的角色,一样 Data Node 也不兼任 Master 的功能。


Data 的配置:

node.master: false 
node.datatrue 
node.ingest: false


Coordinating Only Nodes

 


ES 自己是一个分布式的计算集群,每一个 Node 均可以响应用户的请求,包括 Master Node、Data Node,它们都有完整的 Cluster State 信息。


正如咱们知道的同样,在某个 Node 收到用户请求的时候,会将请求转发到集群中全部索引相关的 Node 上,以后将每一个 Node 的计算结果合并后返回给请求方。


咱们暂且将这个 Node 称为查询节点,整个过程跟分布式数据库原理相似。那问题来了,这个查询节点若是在并发和数据量比较大的状况下,因为数据的聚合可能会让内存和网络出现瓶颈。


所以,在职责分离指导思想的前提下,这些操做咱们也应该从这些角色中剥离出来,官方称它是 Coordinating Nodes,只负责路由用户的请求,包括读、写等操做,对内存、网络和 CPU 要求比较高。


本质上,Coordinating Only Nodes 能够笼统的理解为是一个负载均衡器,或者反向代理,只负责读,自己不写数据,它的配置是:

node.masterfalse 
node.datafalse 
node.ingestfalse 
search.remote.connectfalse


增长 Coordinating Nodes 的数量能够提升 API 请求响应的性能,咱们也能够针对不一样量级的 Index 分配独立的 Coordinating Nodes 来知足请求性能。


那是否是越多越好呢?在必定范围内是确定的,但凡事有个度,过了负做用就会突显,太多的话会给集群增长负担。


在作 Master 选举的时候会先确保全部 Node 的 Cluster State 是一致的,同步的时候会等待每一个 Node 的 Acknowledgement 确认,因此适量分配可让集群畅快的工做。


search.remote.connect 是禁用跨集群查询,防止在进行集群之间查询时发生二次路由:

https://www.elastic.co/guide/en/elasticsearch/reference/current/modules-cross-cluster-search.html


Routing


相似于分布式数据库中的分片原则,将符合规则的数据存储到同一分片。ES 经过哈希算法来决定数据存储于哪一个 Shard:

shard_num = hash(_routing) % num_primary_shards


其中 hash(_routing) 得出一个数字,而后除以主 Shards 的数量获得一个余数,余数的范围是 0 到 number_of_primary_shards - 1,这个数字就是文档所在的 Shard。


Routing 默认是 id 值,固然能够自定义,合理指定 Routing 可以大幅提高查询效率,Routing 支持 GET、Delete、Update、Post、Put 等操做。


如:

PUT my_index/my_type/1?routing=user1
{
  "title""This is a document"
}

GET my_index/my_type/1?routing=user1


不指定 Routing 的查询过程:


简单的来讲,一个查询请求过来之后会查询每一个 Shard,而后作结果聚合,总的时间大概就是全部 Shard 查询所消耗的时间之和。


指定 Routing 之后:


会根据 Routing 查询特定的一个或多个 Shard,这样就大大减小了查询时间,提升了查询效率。


固然,如何设置 Routing 是一个难点,须要一点技巧,要根据业务特色合理组合 Routing 的值,来划分 Shard 的存储,最终保持数据量相对均衡。


能够组合几个维度作为 Routing ,有点相似于 Hbase Key,例如不一样的业务线加不一样的类别,不一样的城市和不一样的类型等等,如:

  • _search?routing=beijing:按城市。

  • _search?routing=beijing_user123:按城市和用户。

  • _search?routing=beijing_android,shanghai_android:按城市和手机类型等。


数据不均衡?假如你的业务在北京、上海的数据远远大于其余二三线城市的数据。


再例如咱们的业务场景,A 业务线的数据量级远远大于 B 业务线,有时候很难经过 Routing 指定一个值保证数据在全部 Shards 上均匀分布,会让部分 Shard 变的愈来愈大,影响查询性能,怎么办?


一种解决办法是单独为这些数据量大的渠道建立独立的 Index,如:

http://localhost:9200/shanghai,beijing,other/_search?routing=android


这样能够根据须要在不一样 Index 之间查询,然而每一个 Index 中 Shards 的数据能够作到相对均衡。


另外一种办法是指定 Index 参数 index.routing_partition_size,来解决最终可能产生群集不均衡的问题,指定这个参数后新的算法以下:

shard_num = (hash(_routing) + hash(_id) % routing_partition_size) % num_primary_shards


index.routing_partition_size 应具备大于 1 且小于 index.number_of_shards 的值。


最终数据会在 routing_partition_size 几个 Shard 上均匀存储,是哪一个 Shard 取决于 hash(_id) % routing_partition_size 的计算结果。


指定参数 index.routing_partition_size 后,索引中的 Mappings 必须指定 _routing 为 "required": true,另外 Mappings 不支持 parent-child 父子关系。


不少状况下,指定 Routing 后会大幅提高查询性能,毕竟查询的 Shard 只有那么几个,可是如何设置 Routing 是个难题,可根据业务特性巧妙组合。


索引拆分


Index 经过横向扩展 Shards 实现分布式存储,这样能够解决 Index 大数据存储的问题。


但在一个 Index 变的愈来愈大,单个 Shard 也愈来愈大,查询和存储的速度也愈来愈慢。


更重要的是一个 Index 实际上是有存储上限的(除非你设置足够多的 Shards 和机器),如官方声明单个 Shard 的文档数不能超过 20 亿(受限于 Lucene index,每一个 Shard 是一个 Lucene index)。


考虑到 I、O,针对 Index 每一个 Node 的 Shards 数最好不超过 3 个,那面对这样一个庞大的 Index,咱们是采用更多的 Shards,仍是更多的 Index,咱们如何选择?


Index 的 Shards 总量也不宜太多,更多的 Shards 会带来更多的 I、O 开销,其实答案就已经很明确,除非你能接受长时间的查询等待。


Index 拆分的思路很简单,时序索引有一个好处就是只有增长,没有变动,按时间累积,自然对索引的拆分友好支持,能够按照时间和数据量作任意时间段的拆分。


ES 提供的 Rollover Api + Index Template 能够很是便捷和友好的实现 Index 的拆分工做,把单个 index docs 数量控制在百亿内,也就是一个 Index 默认 5 个 Shards 左右便可,保证查询的即时响应。


简单介绍一下 Rollover API 和 Index Template 这两个东西,如何实现 index 的拆分。


Index Template


咱们知道 ES 能够为同一目的或同一类索引建立一个 Index Template,以后建立的索引只要符合匹配规则就会套用这个 Template,没必要每次指定 Settings 和 Mappings 等属性。


一个 Index 能够被多个 Template 匹配,那 Settings 和 Mappings 就是多个 Template 合并后的结果。


有冲突经过 Template 的属性"order" : 0 从低到高覆盖(这部分听说会在 ES6 中会作调整,更好的解决 Template 匹配冲突问题)。


示例:

PUT _template/template_1
{
    "index_patterns" : ["log-*"],
    "order" : 0,
    "settings" : {
        "number_of_shards" : 5
    },
    "aliases" : {
        "alias1" : {}
    }
}


Rollover Index



Rollover Index 能够将现有的索引经过必定的规则,如数据量和时间,索引的命名必须是 logs-000001 这种格式,并指定 aliases,示例:

PUT /logs-000001 
{
  "aliases": {
    "logs_write": {}
  }
}

# Add > 1000 documents to logs-000001

POST /logs_write/_rollover 
{
  "conditions": {
    "max_age":   "7d",
    "max_docs":  1000
  }
}


先建立索引并指定别名 logs_write,插入 1000 条数据,而后请求 _rollover api 并指定拆分规则。


若是索引中的数据大于规则中指定的数据量或者时间过期,新的索引将被建立,索引名称为 logs-000002,并根据规则套用 Index Template,同时别名 logs_write 也将被变动到 logs-000002。


注意事项:

  • 索引命名规则必须如同:logs-000001。

  • 索引必须指定 aliases。

  • Rollover Index API 调用时才去检查索引是否超出指定规则,不会自动触发,须要手动调用,能够经过 Curator 实现自动化。

  • 若是符合条件会建立新的索引,老索引的数据不会发生变化,若是你已经插入 2000 条,拆分后仍是 2000 条。

  • 插入数据时必定要用别名,不然你可能一直在往一个索引里追加数据。


技巧是按日期滚动索引:

PUT /<logs-{now/d}-1>
{
  "aliases": {
    "logs_write": {}
  }
}


假如生成的索引名为 logs-2017.04.13-1,若是你在当天执行 Rollover 会生成 logs-2017.04.13-000001,第二天的话是 logs-2017.04.14-000001。


这样就会按日期进行切割索引,那若是你想查询 3 天内的数据能够经过日期规则来匹配索引名,如:

GET /<logs-{now/d}-*>,<logs-{now/d-1d}-*>,<logs-{now/d-2d}-*>/_search


到此,咱们就能够经过 Index Template 和 Rollover API 实现对 Index 的任意拆分,并按照须要进行任意时间段的合并查询,这样只要你的时间跨度不是很大,查询速度通常能够控制在毫秒级,存储性能也不会遇到瓶颈。


Hot-Warm 架构


image.png

冷热架构,为了保证大规模时序索引实时数据分析的时效性,能够根据资源配置不一样将 Data Nodes 进行分类造成分层或分组架构。


一部分支持新数据的读写,另外一部分仅支持历史数据的存储,存放一些查询发生机率较低的数据。


即 Hot-Warm 架构,对 CPU,磁盘、内存等硬件资源合理的规划和利用,达到性能和效率的最大化。


咱们能够经过 ES 的 Shard Allocation Filtering 来实现 Hot-Warm 的架构。


实现思路以下:

  • 将 Data Node 根据不一样的资源配比打上标签,如:Host、Warm。

  • 定义 2 个时序索引的 Index Template,包括 Hot Template 和 Warm Template,Hot Template 能够多分配一些 Shard 和拥有更好资源的 Hot Node。

  • 用 Hot Template 建立一个 Active Index 名为 active-logs-1,别名 active-logs,支持索引切割。

    插入必定数据后,经过 roller over api 将 active-logs 切割,并将切割前的 Index 移动到 Warm Nodes 上,如 active-logs-1,并阻止写入。

  • 经过 Shrinking API 收缩索引 active-logs-1 为 inactive-logs-1,原 Shard 为 5,适当收缩到 2 或 3,能够在 Warm Template 中指定,减小检索的 Shard,使查询更快。

  • 经过 force-merging api 合并 inactive-logs-1 索引每一个 Shard 的 Segment,节省存储空间。

  • 删除 active-logs-1。


Hot,Warm Nodes


Hot Nodes


拥有最好资源的 Data Nodes,如更高性能的 CPU、SSD 磁盘、内存等资源,这些特殊的 Nodes 支持索引全部的读、写操做。


若是你计划以 100 亿为单位来切割 Index,那至少须要三个这样的 Data Nodes,Index 的 Shard 数为 5,每一个 Shard 支持 20 亿 Documents 数据的存储。


为这类 Data Nodes 打上标签,以便咱们在 Template 中识别和指定,启动参数以下:

./bin/elasticsearch -Enode.attr.box_type=hot


或者配置文件:

node.attr.box_typehot


Warm Nodes


存储只读数据,而且查询量较少,但用于存储长多时间历史数据的 Data Nodes,这类 Nodes 相对 Hot Nodes 较差的硬件配置,根据需求配置稍差的 CPU、机械磁盘和其余硬件资源,至于数量根据须要保留多长时间的数据来配比,一样须要打上标签,方法跟 Hot Nodes 同样,指定为 Warm,box_type: warm。


Hot,Warm Template


Hot Template


咱们能够经过指定参数"routing.allocation.include.box_type": "hot",让全部符合命名规则索引的 Shard 都将被分配到 Hot Nodes 上:

PUT _template/active-logs
{
  "template""active-logs-*",
  "settings": {
    "number_of_shards":   5,
    "number_of_replicas": 1,
    "routing.allocation.include.box_type""hot",
    "routing.allocation.total_shards_per_node": 2
  },
  "aliases": {
    "active-logs":  {}
  }
}


Warm Template


一样符合命名规则索引的 Shard 会被分配到 Warm Nodes 上,咱们指定了更少的 Shards 数量和复本数。


注意,这里的复本数为 0,和 best_compression 级别的压缩,方便作迁移等操做,以及进行一些数据的压缩:

PUT _template/inactive-logs
{
  "template""inactive-logs-*",
  "settings": {
    "number_of_shards":   1,
    "number_of_replicas": 0,
    "routing.allocation.include.box_type""warm",
    "codec""best_compression"
  }
}


假如咱们已经建立了索引 active-logs-1 ,固然你能够经过 _bulk API 快速写入测试的数据,而后参考上文中介绍的 Rollover API 进行切割。


Shrink Index


Rollover API 切割之后,active-logs-1 将变成一个冷索引,咱们将它移动到 Warm Nodes 上。


先将索引置为只读状态,拒绝任何写入操做,而后修改 index.routing.allocation.include.box_type 属性,ES 会自动移动全部 Shards 到目标 Data Nodes 上:

PUT active-logs-1/_settings
{
  "index.blocks.write"true,
  "index.routing.allocation.include.box_type""warm"
}


Cluster Health API 能够查看迁移状态,完成后进行收缩操做,其实至关于复制出来一个新的索引,旧的索引还存在。

POST active-logs-1/_shrink/inactive-logs-1


咱们能够经过 Head 插件查看整个集群索引的变化状况。关于 Shard 的分配请参考:

https://www.elastic.co/guide/en/elasticsearch/reference/current/shard-allocation-filtering.html


Forcemerge


到目前为止咱们已经实现了索引的冷热分离,和索引的收缩,咱们知道每一个 Shard 下面由多个 Segment 组成,那 inactive-logs-1 的 Shard 数是 1,但 Segment 仍是多个。


这类索引不会在接受写入操做,为了节约空间和改善查询性能,经过 Forcemerge API 将 Segment 适量合并:

PUT inactive-logs-1/_settings
"number_of_replicas": 1 }


ES 的 Forcemerge 过程是先建立新的 Segment 删除旧的,因此旧 Segment 的压缩方式 best_compression 不会在新的 Segment 中生效,新的 Segment 仍是默认的压缩方式。


如今 inactive-logs-1 的复本仍是 0,若是有须要的话,能够分配新的复本数:

PUT inactive-logs-1/_settings
"number_of_replicas": 1 }


最后删除 active-logs-1,由于咱们已经为它作了一个查询复本 inactive-logs-1。


DELETE active-logs-1


走到这里,咱们就已经实现了 Index 的 Hot-Warm 架构,根据业务特色将一段时间的数据放在 Hot Nodes,更多的历史数据存储于 Warm Nodes。


其余优化方案


索引隔离


image.png

在多条业务线的索引共用一个 ES 集群时会发生流量被独吃独占的状况,由于你们都共用相同的集群资源,流量大的索引会占用大部分计算资源而流量小的也会被拖慢,得不到即时响应,或者说业务流量大的索引能够按天拆分,几个流量小的索引能够按周或月拆分。


这种状况下咱们能够将规模大的索引和其余相对小规模的索引独立存储,分开查询或合并查询。


除了 Master Nodes 之外,Data Nodes 和 Coordinating Nodes 均可以独立使用(其实若是每一个索引的量都特别大也应该采用这种架构),还有一个好处是对方的某个 Node 挂掉,本身不受影响。


一样利用 ES 支持 Shard Allocation Filtering 功能来实现索引的资源独立分配,先将 Nodes 进行打标签,划分区域,相似于 Hot-Warm 架构:

node.attr.zone=zone_a、node.attr.zone=zone_b


或者:

node.attr.zone =zone_hot_a、node.attr.zone=zone_hot_b


等打标签的方式来区别对应不一样的索引,而后在各自的 Index Template 中指定不一样的 node.attr.zone 便可。


如"index.routing.allocation.include.zone" : "zone_a,zone_hot_a",

或者排除法"index.routing.allocation.exclude.size": "zone_hot_b"

分配到 zone_hot_b 之外的全部 Data Nodes 上。


更多用法能够参考 Hot-Warm 架构,或 shard-allocation-filtering:

https://www.elastic.co/guide/en/elasticsearch/reference/current/shard-allocation-filtering.html


跨数据中心


image.png

若是你的业务遍及全国各地,四海八荒,若是你数据要存储到多个机房,若是你的 Index 有几万个甚至更多( Index 特别多,集群庞大会致使 Cluster State 信息量特别大,由于 Cluster State 包含了全部 Shard、Index、Node 等全部相关信息,它存储在每一个 Node 上,这些数据发生变化都会实时同步到全部 Node 上,当这个数据很大的时候会对集群的性能形成影响)。


这些状况下咱们会考虑部署多个 ES Cluster,那咱们将如何解决跨集群查询的问题呢?


目前 ES 针对跨集群操做提供了两种方案 Tribe Node 和 Cross Cluster Search。


Tribe Node


image.png

须要一个独立的 Node 节点,加入其余 ES Cluster,用法有点相似于 Coordinating Only Node。


所不一样的是 Tribe 是针对多个 ES 集群之间的全部节点,Tribe Node 收到请求广播到相关集群中全部节点,将结果合并处理后返回。


表面上看起来 Tribe Node 将多个集群串连成了一个总体,遇到同名 Index 发生冲突,会选择其中一个 Index,也能够指定:

tribe:
  on_conflictprefer_t1
  t1:
    cluster.nameus-cluster
    discovery.zen.ping.multicast.enabledfalse
    discovery.zen.ping.unicast.hosts['usm1','usm2','usm3']
  t2:
    cluster.nameeu-cluster
    discovery.zen.ping.multicast.enabledfalse
    discovery.zen.ping.unicast.hosts['eum1','eum2','eum3']


Cross Cluster Search



Cross Cluster Search 可让集群中的任意一个节点联合查询其余集群中的数据, 经过配置 elasticsearch.yml 或者 API 来启用这个功能,API 示例:

PUT _cluster/settings
{
  "persistent": {
    "search": {
      "remote": {
        "cluster_one": {
          "seeds": [
            "127.0.0.1:9300"
          ]
    ...
        }
      }
    }
  }
}


提交之后整个集群全部节点都会生效,均可以作为代理去作跨集群联合查询,不过咱们最好仍是经过 Coordinating Only Nodes 去发起请求。

POST /cluster_one:decision,decision/_search
{
    "match_all": {}
}


对集群 cluster_one 和本集群中名为 Decision 的索引联合查询。


目前这个功能还在测试阶段,但将来可能会取代 Tribe Node,之间的最大的差别是 Tribe Node 须要设置独立的节点,而 Cross Cluster Search 不须要,集群中的任意一个节点均可以兼任。


好比能够用咱们的 Coordinating Only Nodes 作为联合查询节点,另外一个优势是配置是动态的,不须要重启节点。


实际上能够理解为是一个 ES 集群之间特定的动态代理工具,支持全部操做,包括 Index 的建立和修改,而且经过 Namespace 对 Index 进行隔离,也解决了 Tribe Node 之 Index 名称冲突的问题。


小结


咱们在文中介绍了几种方案用来解决时序索引的海量数据存储和查询的问题,根据业务特色和使用场景来单独或组合使用可以发挥出意想不到的效果。


特别是 Nodes 之间的读写分离、索引拆分、Hot-Warm 等方案的组合应用对索引的查询和存储性能有显著的提高。


另外 Routing 在新版本中增长了 routing_partition_size,解决了 Shard 难以均衡的问题。


若是你的索引 Mapping 中没有 parent-child 关联关系能够考虑使用,对查询的性能提高很是有效。