原文html
shard allocation
意思是分片分配
, 是一个将分片分配到节点的过程; 可能发生该操做的过程包括:java
initial recovery
)replica allocation
)rebalance
)来源node
分片的分配操做, 是由 master
角色的节点来决定何时移动分片, 以及移动到哪一个节点上, 以达到集群的均衡;web
说明json
本文基于 Elasticsearch 7.4.0 版本服务器
Allocation
触发条件index
索引node
节点的新增或删除reroute
命令replica
副本数量具体对应源码解释:来源网络
序号 | 调用函数 | 说明 |
---|---|---|
1 | AllocationService.applyStartedShards | Shard 启动状态修改 |
2 | AllocationService.applyFailedShards | Shard 失效状态修改 |
3 | AllocationService.deassociateDeadNodes | Node 节点离开集群 |
4 | AllocationService.reroute(AllocationCommands) | 执行 relocation 命令 |
5 | TransportClusterUpdateSettingsAction.masterOperation | 集群配置修改操做 |
6 | MetaDataCreateIndexService.onlyCreateIndex | 建立新索引 index 请求 |
7 | MetaDataDeleteIndexService.deleteIndexs | 删除索引 index 操做 |
8 | MetaDataIndexStateService.closeIndex | 关闭 index 操做 |
9 | MetaDataIndexStateService.openIndex | 打开 index操做 |
10 | NodeJoinController.JoinTaskExecutor | 经过集群发现的节点加入集群 |
11 | GatewayService.GatewayRecoveryListener | 经过 GatewayRecovery 恢复的节点加入集群 |
12 | LocalAllocateDangledIndices.submitStateUpdateTask | 恢复磁盘内存而在 MateDate 内不存在的 index |
13 | RestoreService.restoreSnapshot | 从 snapshot 中恢复的 index |
Rebalance
的触发条件在 rebalance
以前会通过 2.3.2
中介绍的全部策略里实现的 canRebalance
方法, 所有经过后才会执行下面的 Rebalance
过程;并发
Rebalance
过程是经过调用 balanceByWeights()
方法, 计算 shard 所在的每一个 node 的 weight
值,app
其中:负载均衡
numAdditionalShards
通常为 0, 调用 weightShardAdded
, weightShardRemoved
方法时分别取值为 1
和 -1
;cluster.routing.allocation.balance.shard
系统动态配置项, 默认值为 0.45f
;cluster.routing.allocation.balance.index
系统动态配置项, 默认值为 0.55f
;源码以下:
private static class WeightFunction { private final float indexBalance; private final float shardBalance; private final float theta0; private final float theta1; WeightFunction(float indexBalance, float shardBalance) { float sum = indexBalance + shardBalance; if (sum <= 0.0f) { throw new IllegalArgumentException("Balance factors must sum to a value > 0 but was: " + sum); } theta0 = shardBalance / sum; theta1 = indexBalance / sum; this.indexBalance = indexBalance; this.shardBalance = shardBalance; } float weight(Balancer balancer, ModelNode node, String index) { final float weightShard = node.numShards() - balancer.avgShardsPerNode(); final float weightIndex = node.numShards(index) - balancer.avgShardsPerNode(index); return theta0 * weightShard + theta1 * weightIndex; } }
分片分配就是把一个分片分配到集群中某个节点的过程, 其中分配决策包含了两个方面:
Elasticsearch 主要经过两个基础组件来完成分片分配这个过程的: allocator
和 deciders
;
allocator
寻找最优的节点来分配分片;deciders
负责判断并决定是否要进行分配; allocator
负责找出拥有分片数量最少的节点列表, 按分片数量递增排序, 分片数量较少的会被优先选择; 对于新建索引, allocator
的目标是以更为均衡的方式把新索引的分片分配到集群的节点中;
deciders
依次遍历 allocator
给出的节点列表, 判断是否要把分片分配给该节点, 好比是否知足分配过滤规则, 分片是否将超出节点磁盘容量阈值等等;
allocator
对于主分片, 只容许把主分片指定在已经拥有该分片完整数据的节点上; 对于副本分片, 则是先判断其余节点上是否已有该分片的数据的拷贝, 若是有这样的节点, allocator
则优先把分片分配到这其中一个节点上;
PrimaryShardAllocator
找到拥有某 Shard
最新数据(主分片)的节点;ReplicaShardAllocator
找到磁盘上拥有这个 Shard
数据(副本分片)的节点;BalancedShardsAllocator
找到拥有最少 Shard
个数的节点;public class BalancedShardsAllocator implements ShardsAllocator { public static final Setting<Float> INDEX_BALANCE_FACTOR_SETTING = Setting.floatSetting("cluster.routing.allocation.balance.index", 0.55f, 0.0f, Property.Dynamic, Property.NodeScope); public static final Setting<Float> SHARD_BALANCE_FACTOR_SETTING = Setting.floatSetting("cluster.routing.allocation.balance.shard", 0.45f, 0.0f, Property.Dynamic, Property.NodeScope); public static final Setting<Float> THRESHOLD_SETTING = Setting.floatSetting("cluster.routing.allocation.balance.threshold", 1.0f, 0.0f, Property.Dynamic, Property.NodeScope); private volatile WeightFunction weightFunction; private volatile float threshold; }
Deciders
决策期基础组件的抽象类为 AllocationDecider
:
public abstract class AllocationDecider { public Decision canRebalance(ShardRouting shardRouting, RoutingAllocation allocation) { return Decision.ALWAYS; } public Decision canAllocate(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { return Decision.ALWAYS; } public Decision canRemain(ShardRouting shardRouting, RoutingNode node, RoutingAllocation allocation) { return Decision.ALWAYS; } public Decision canAllocate(ShardRouting shardRouting, RoutingAllocation allocation) { return Decision.ALWAYS; } public Decision canAllocate(IndexMetadata indexMetadata, RoutingNode node, RoutingAllocation allocation) { return Decision.ALWAYS; } public Decision canAllocate(RoutingNode node, RoutingAllocation allocation) { return Decision.ALWAYS; } public Decision shouldAutoExpandToNode(IndexMetadata indexMetadata, DiscoveryNode node, RoutingAllocation allocation) { return Decision.ALWAYS; } public Decision canRebalance(RoutingAllocation allocation) { return Decision.ALWAYS; } }
ES 7.4.0 中的 Decider
决策器包括如下所示, 他们均实现上面的 AllocationDecider
抽象类, 并重写 canRebalance
, canAllocate
, canRemain
, canForceAllocatePrimary
等方法;
决策器比较多, 大体分类以下, 并列举决策器对应的配置项:
SameShardAllocationDecider
: 避免主副分片分配到同一个节点;
AwarenessAllocationDecider
: 感知分配器, 感知服务器, 机架等, 尽可能分散存储 Shard;
对应的配置参数有:
cluster.routing.allocation.awareness.attributes: rack_id
cluster.routing.allocation.awareness.attributes: zone
ShardsLimitAllocationDecider
: 同一个节点上容许存在同一个 index
的 shard
数目;
index.routing.allocation.total_shards_per_node
: 表示该索引每一个节点上容许最多的shard
数量; 默认值=-1, 表示无限制;
cluster.routing.allocation.total_shards_per_node
:cluster
级别, 表示集群范围内每一个节点上容许最多的shard
数量, 默认值=-1, 表示无限制;
index
级别会覆盖cluster
级别;
ThrottlingAllocationDecider
: recovery
阶段的限速配置, 避免过多的 recovering allocation
致使该节点的负载太高;
cluster.routing.allocation.node_initial_primaries_recoveries
: 当前节点在进行主分片恢复时的数量, 默认值=4;
cluster.routing.allocation.node_concurrent_incoming_recoveries
: 默认值=2, 一般是其余节点上的副本 shard 恢复到该节点上;
cluster.routing.allocation.node_concurrent_outgoing_recoveries
: 默认值=2, 一般是当前节点上的主分片 shard 恢复副本分片到其余节点上;
cluster.routing.allocation.node_concurrent_recoveries
: 统一配置上面两个配置项;
ConcurrentRebalanceAllocationDecider
: rebalace
并发控制, 表示集群同时容许进行 rebalance
操做的并发数量;
cluster.routing.allocation.cluster_concurrent_rebalance
, 默认值=2经过检查
RoutingNodes
类中维护的reloadingShard
计数器, 看是否超过配置的并发数;
DiskThresholdDecider
: 根据节点的磁盘剩余量来决定是否分配到该节点上;
cluster.routing.allocation.disk.threshold_enabled
, 默认值=true;
cluster.routing.allocation.disk.watermark.low
: 默认值=85%, 达到这个值后, 新索引的分片不会分配到该节点上;
cluster.routing.allocation.disk.watermark.high
: 默认值=90%, 达到这个值后, 会触发已分配到该节点上的Shard
会rebalance
到其余节点上去;
RebalanceOnlyWhenActiveAllocationDecider
: 全部 Shard
都处于 active
状态下才能够执行 rebalance
操做;
FilterAllocationDecider
: 经过接口动态设置的过滤器; cluster
级别会覆盖 index
级别;
index.routing.allocation.require.{attribute}
index.routing.allocation.include.{attribute}
index.routing.allocation.exclude.{attribute}
cluster.routing.allocation.require.{attribute}
cluster.routing.allocation.include.{attribute}
cluster.routing.allocation.exclude.{attribute}
- require 表示必须知足, include 表示能够分配到指定节点, exclude 表示不容许分配到指定节点;
- {attribute} 还有 ES 内置的几个选择, _name, _ip, _host;
ReplicaAfterPrimaryActiveAllocationDecider
: 保证只在主分片分配完成后(active
状态)才开始分配副本分片;
ClusterRebalanceAllocationDecider
: 经过集群中 active
的 shard
状态来决定是否能够执行 rebalance
;
cluster.routing.allocation.allow_rebalance
indices_all_active
(默认): 当集群全部的节点分配完成, 才能够执行rebalance
操做;
indices_primaries_active
: 只要全部主分片分配完成, 才能够执行rebalance
操做;
always
: 任何状况下都容许 rebalance 操做;
MaxRetryAllocationDecider
: 防止 shard 在失败次数达到上限后继续分配;
index.allocation.max_retries
: 设置分配的最大失败重试次数, 默认值=5;
EnableAllocationDecider
: 设置容许分配的分片类型; index
级别配置会覆盖 cluster
级别配置;
all
(默认): 容许全部类型的分片;
primaries
: 仅容许主分片;
new_primaries
: 仅容许新建索引的主分片;
none
: 禁止分片分配操做;
NodeVersionAllocationDecider
: 检查分片所在 Node 的版本是否高于目标 Node 的 ES 版本;
SnapshotInProgressAllocationDecider
: 决定 snapshot
期间是否容许 allocation
, 由于 snapshot
只会发生在主分片上, 因此该配置只会限制主分片的 allocation
;
cluster.routing.allocation.snapshot.relocation_enabled
接下来介绍一下在 Elasticsearch 中涉及到 Allocation
和 Rebalance
的相关配置项;
控制分片的分配和恢复;
配置 | 默认值 | 说明 |
---|---|---|
cluster.routing.allocation.enable | all | 启用或禁用针对特定类型分片的分配; 1. all : 容许分配全部类型的分片; 2. primaries : 只容许分配主分片(primary shard ); 3. new_primaries : 只容许分配新索引的主分片(primary shard );4. none : 禁用分片分配;该设置不会影响重启节点时本地主分片的恢复; |
cluster.routing.allocation.node_concurrent_incoming_recoveries | 2 | 一个节点容许并发的传入分片(incoming shard )数量 |
cluster.routing.allocation.node_concurrent_outgoing_recoveries | 2 | 一个节点容许并发的传出分片(incoming shard )数量 |
cluster.routing.allocation.node_concurrent_recoveries | 上面二者的合并配置 | |
cluster.routing.allocation.node_initial_primaries_recoveries | 4 | 单个节点上同时初始化的主分片数量 |
cluster.routing.allocation.same_shard.host | false | 是否执行检查, 以防止基于host name 和host address , 在单个主机上分配同一分片的多个实例; 该设置仅用于在同一台计算机上启动多个节点的状况; |
控制集群之间的分片平衡;
配置 | 默认值 | 说明 |
---|---|---|
cluster.routing.rebalance.enable | all | 启用或禁用针对特定类型分片的rebalancing ;1. all : 容许rebalancing 全部类型的分片;2. primaries : 只容许rebalancing 主分片;3. replicas : 只容许rebalancing 副本分片;4. none : 禁用rebalancing ; |
cluster.routing.allocation.allow_rebalance | indices_all_active | 指定什么时候容许执行rebalancing ;1. always : 老是容许;2. indices_primaries_active : 当集群中全部主分片已分配时才容许rebalancing ;3. indices_all_active : 当集群中全部分片(包括主分片和副本分片)都已分配时才容许rebalancing ; |
cluster.routing.allocation.cluster_concurrent_rebalance | 2 | 指定整个集群中容许同时在节点间移动的分片数量; 该配置仅控制因为集群不平衡引发的并发分片分配数量, 对分配过滤(allocation filtering )或强制感知(forced awareness )的分片分配不作限制; |
如下配置用于决定每一个分片的存放位置; 当rebalancing
操做再也不使任何节点的权重超过balance.threshold
时, 集群即达到平衡;
配置 | 默认值 | 说明 |
---|---|---|
cluster.routing.allocation.balance.shard | 0.45f | 定义节点上分配的分片总数的权重因子; 提高该值会致使集群中全部节点趋向于分片数量相等; |
cluster.routing.allocation.balance.index | 0.55f | 定义节点上分配的每一个索引的分片数量的权重因子; 提高该值会致使集群中全部节点上每一个索引的分片数量趋向于相等; |
cluster.routing.allocation.balance.threshold | 1.0f | 定义应当执行操做的最小优化值(非负浮点数); 提高该值会致使集群在优化分片平衡方面不太积极; |
如下配置控制每一个索引中的分片分配;
配置须要分两步:
elasticsearch.yml
配置文件中添加自定义节点属性, 好比以 small
, medium
, big
区分节点类型, 则配置文件中可添加:node.attr.size: medium
或者在启动 Elasticsearch 服务时, 在命令行里添加 ./bin/elasticsearch -Enode.attr.size=medium
;
mapping
时, 添加index.routing.allocation.include/exclude/require.size: medium
的过滤配置便可;PUT <index_name>/_settings { "index.routing.allocation.include.size": "medium" }
能够配置多个自定义节点属性, 而且必须同时知足索引里配置的多个过滤条件;
其中 {attribute}
能够是上面提到的自定义节点属性, ES 本身也有一些内置的节点属性:
attribute | 说明 |
---|---|
_name | 经过节点名称进行匹配 |
_host_ip | 经过节点 IP 地址进行匹配 |
_publish_ip | 经过节点的发布 IP 地址进行匹配 |
_ip | 经过 _host_ip 或 _publish_ip 进行匹配 |
_host | 经过节点的hostname 进行匹配 |
_id | 经过节点的 id 进行匹配 |
其中 {values}
能够是单个值, 也能够是逗号分隔的多个值, 也可使用通配符 *
进行模糊匹配;
当某个节点因为突发缘由, 好比网络中断, 人为操做重启等, 须要暂时离开集群时, 集群会马上新建副本分片以替换丢失的副本, 而后在剩余的全部节点之间进行rebalancing
, 这样致使在短期内该突发节点又恢复过来后, 原先的副本就没法再使用, 集群会将刚才新建的副本分片再拷贝回到该节点上; 这样就会形成没必要要的资源浪费, 以及节点分片rebalancing
带来的波动;
可使用 index.unassigned.node_left.delayed_timeout
动态设置来延迟因为节点离开而致使未分配的副本分片的分配问题; 该配置默认值 1m
;
PUT _all/_settings { "settings": { "index.unassigned.node_left.delayed_timeout": "5m" } }
修改为以上配置后, 若是在 5m
内, 该节点能够恢复从新加入集群, 则集群会自动恢复该节点的副本分片分配, 恢复速度很快;
注意
- 此设置不影响将副本分片升级为主分片;
- 此设置不影响以前未分配的副本分片;
- 在整个集群从新启动后, 该延迟分配不会生效;
索引分片恢复的优先级按照:
index.priority
配置, 值越大优先级越高;index
索引的建立日期, 越新的索引优先级越高;index
索引的名称;配置 | 默认值 | 说明 |
---|---|---|
index.routing.allocation.total_shards_per_node | unbounded(-1) | 指定单个节点上最多分配的分片数量, 包括主分片和副本分片;(具体某个索引) |
cluster.routing.allocation.total_shards_per_node | unbounded(-1) | 指定单个节点上最多分配的分片数量, 包括主分片和副本分片;(与索引无关, 全局设置) |
这些配置是硬性配置, 可能会致使一些分片没法分配, 须要慎重配置;