SchedulerCache是kubernetes scheduler中负责本地数据缓存的核心数据结构, 其实现了Cache接口,负责存储从apiserver获取的数据,提供给Scheduler调度器获取Node的信息,而后由调度算法的决策pod的最终node节点,其中Snapshot和节点打散算法很是值得借鉴node
SchedulerCache的数据从apiserver经过网络感知,其数据的同步一致性主要是经过kubernetes中的Reflector组件来负责保证,SchedulerCache自己就是一个单纯数据的存储算法
当scheduler获取一个待调度的pod,则须要从Cache中获取当前集群中的快照数据(当前此时集群中node的统计信息), 用于后续调度流程中使用api
节点打散主要是指的调度器调度的时候,在知足调度需求的状况下,为了保证pod均匀分配到全部的node节点上,一般会按照逐个zone逐个node节点进行分配,从而让pod节点打散在整个集群中缓存
Scheduler进行完成调度流程的决策以后,为pod选择了一个node节点,此时还未进行后续的Bind操做,但实际上资源已经分配给该pod, 此时会先更新到本地缓存(),而后再等待apiserver进行数据的广播而且最终被kubelet来进行实际的调度安全
但若是由于某些缘由致使pod后续的事件都没有被监听到,则须要将对应的pod资源进行删除,并删除对node资源的占用网络
在scheduler cache中pod会一个内部的状态机:initial、Assumed、Expired、Added、Delete,实际上全部的操做都是围绕着该状态机在进行,状态以下:
Initial: 初始化完成从apiserver监听到(也多是监听到一个已经完成分配的pod)
Assumed: 在scheduler中完成分配最终完成bind操做的pod(未实际分配)
Added: 首先监听到事件多是一个已经完成实际调度的pod(即从initial到Added),其次多是通过调度决策后,被实际调度(从Assumed到Added),最后则是后续pod的更新(Update), Added语义上其实就是往Cache中添加一个Pod状态
Deleted: 某个pod被监听到删除事件,只有被Added过的数据才能够被Deleted
Expired: Assumed pod通过一段时间后没有感知到真正的分配事件被删除数据结构
type schedulerCache struct { stop <-chan struct{} ttl time.Duration period time.Duration // 保证数据的安全 mu sync.RWMutex // 存储假定pod的信息集合,通过scheduler调度后假定pod被调度到某些节点,进行本地临时存储 // 主要是为了进行node资源的占用,能够经过key在podStats查找到假定的pod信息 assumedPods map[string]bool // pod的状态 podStates map[string]*podState // 存储node的映射 nodes map[string]*nodeInfoListItem csiNodes map[string]*storagev1beta1.CSINode // node信息的链表,按照最近更新时间来进行链接 headNode *nodeInfoListItem // 存储node、zone的映射信息 nodeTree *NodeTree // 镜像信息 imageStates map[string]*imageState }
Snapshot数据结构主要负责存储当前集群中的node信息,而且经过Generation记录当前更新的最后一个周期app
type Snapshot struct { NodeInfoMap map[string]*NodeInfo Generation int64 }
建立主要位于kubernetes/pkg/scheduler/core/generic_scheduler.go,实际上就是建立一个空的snapshot对象spa
nodeInfoSnapshot: framework.NodeInfoSnapshot(),
数据的更新则是经过snapshot方法来调用Cache的更新接口来进行更新设计
func (g *genericScheduler) snapshot() error { // Used for all fit and priority funcs. return g.cache.UpdateNodeInfoSnapshot(g.nodeInfoSnapshot) }
随着集群中node和pod的数量的增长,若是每次都全量获取snapshot则会严重影响调度器的调度效率,在Cache中经过一个双向链表和node的递增计数(etcd实现)来实现增量更新
func (cache *schedulerCache) UpdateNodeInfoSnapshot(nodeSnapshot *schedulernodeinfo.Snapshot) error { cache.mu.Lock() defer cache.mu.Unlock() balancedVolumesEnabled := utilfeature.DefaultFeatureGate.Enabled(features.BalanceAttachedNodeVolumes) // 获取当前snapshot的Genration snapshotGeneration := nodeSnapshot.Generation // 遍历双向链表,更新snapshot信息 for node := cache.headNode; node != nil; node = node.next { if node.info.GetGeneration() <= snapshotGeneration { //全部node信息都更新完毕 break } if balancedVolumesEnabled && node.info.TransientInfo != nil { // Transient scheduler info is reset here. node.info.TransientInfo.ResetTransientSchedulerInfo() } if np := node.info.Node(); np != nil { nodeSnapshot.NodeInfoMap[np.Name] = node.info.Clone() } } // 更新snapshot的genration if cache.headNode != nil { nodeSnapshot.Generation = cache.headNode.info.GetGeneration() } // 若是snapshot里面包含过时的pod信息则进行清理工做 if len(nodeSnapshot.NodeInfoMap) > len(cache.nodes) { for name := range nodeSnapshot.NodeInfoMap { if _, ok := cache.nodes[name]; !ok { delete(nodeSnapshot.NodeInfoMap, name) } } } return nil }
nodeTree主要负责节点的打散,用于让pod均匀分配在多个zone中的node节点上
type NodeTree struct { tree map[string]*nodeArray // 存储zone和zone下面的node信息 zones []string // 存储zones zoneIndex int numNodes int mu sync.RWMutex }
其中zones和zoneIndex主要用于后面的节点打散算法使用,实现按zone逐个分配
nodeArray负责存储一个zone下面的全部node节点,而且经过lastIndex记录当前zone分配的节点索引
type nodeArray struct { nodes []string lastIndex int }
添加node其实很简单,只须要获取对应node的zone信息,而后加入对应zone的nodeArray中
func (nt *NodeTree) addNode(n *v1.Node) { // 获取zone zone := utilnode.GetZoneKey(n) if na, ok := nt.tree[zone]; ok { for _, nodeName := range na.nodes { if nodeName == n.Name { klog.Warningf("node %q already exist in the NodeTree", n.Name) return } } // 吧节点加入到zone中 na.nodes = append(na.nodes, n.Name) } else { // 新加入zone nt.zones = append(nt.zones, zone) nt.tree[zone] = &nodeArray{nodes: []string{n.Name}, lastIndex: 0} } klog.V(2).Infof("Added node %q in group %q to NodeTree", n.Name, zone) nt.numNodes++ }
数据打散算法很简单,首先咱们存储了zone和nodeArray的信息,而后咱们只须要经过两个索引zoneIndex和nodeIndex就能够实现节点的打散操做, 只有当当前集群中全部zone里面的全部节点都进行一轮分配后,而后重建分配索引
func (nt *NodeTree) Next() string { nt.mu.Lock() defer nt.mu.Unlock() if len(nt.zones) == 0 { return "" } // 记录分配完全部node的zone的计数,用于进行状态重置 // 好比有3个zone: 则当numExhaustedZones=3的时候,就会从新从头开始进行分配 numExhaustedZones := 0 for { if nt.zoneIndex >= len(nt.zones) { nt.zoneIndex = 0 } // 按照zone索引来进行逐个zone分配 zone := nt.zones[nt.zoneIndex] nt.zoneIndex++ // 返回当前zone下面的next节点,若是exhausted为True则代表当前zone全部的节点,在这一轮调度中都已经分配了一次 // 就须要从下个zone继续获取节点 nodeName, exhausted := nt.tree[zone].next() if exhausted { numExhaustedZones++ // 全部的zone下面的node都被分配了一次,这里进行重置,从头开始继续分配 if numExhaustedZones >= len(nt.zones) { // all zones are exhausted. we should reset. nt.resetExhausted() } } else { return nodeName } } }
重建索引则是将全部nodeArray的索引和当前zoneIndex进行归零
func (nt *NodeTree) resetExhausted() {// 重置索引 for _, na := range nt.tree { na.lastIndex = 0 } nt.zoneIndex = 0 }
Cache要定时将以前在通过本地scheduler分配完成后的假设的pod的信息进行清理,若是这些pod在给定时间内仍然没有感知到对应的pod真正的添加事件则就这些pod删除
assumedPods map[string]bool
默认每30s进行清理一次
func (cache *schedulerCache) run() { go wait.Until(cache.cleanupExpiredAssumedPods, cache.period, cache.stop) }
清理逻辑主要是针对那些已经完成绑定的pod来进行,若是一个pod完成了在scheduler里面的全部操做后,会有一个过时时间,当前是30s,若是超过该时间即deadline小于当前的时间就删除该pod
// cleanupAssumedPods exists for making test deterministic by taking time as input argument. func (cache *schedulerCache) cleanupAssumedPods(now time.Time) { cache.mu.Lock() defer cache.mu.Unlock() // The size of assumedPods should be small for key := range cache.assumedPods { ps, ok := cache.podStates[key] if !ok { panic("Key found in assumed set but not in podStates. Potentially a logical error.") } // 未完成绑定的pod不会被进行清理 if !ps.bindingFinished { klog.V(3).Infof("Couldn't expire cache for pod %v/%v. Binding is still in progress.", ps.pod.Namespace, ps.pod.Name) continue } // 在完成bind以后会设定一个过时时间,目前是30s,若是deadline即bind时间+30s小于当前时间就过时删除 if now.After(*ps.deadline) { klog.Warningf("Pod %s/%s expired", ps.pod.Namespace, ps.pod.Name) if err := cache.expirePod(key, ps); err != nil { klog.Errorf("ExpirePod failed for %s: %v", key, err) } } } }
清理pod主要分为以下几个部分:
1.对应pod假定分配node的信息
2.清理映射的podState信息
func (cache *schedulerCache) expirePod(key string, ps *podState) error { if err := cache.removePod(ps.pod); err != nil { return err } delete(cache.assumedPods, key) delete(cache.podStates, key) return nil }
核心数据结构数据流如上所示,其核心是经过nodes、headNode实现一个Snapshot为调度器提供当前系统资源的快照,并经过nodeTree进行node节点的打散,最后内部经过一个pod的状态机来进行系统内部的pod资源状态的转换,并经过后台的定时任务来保证通过通过Reflector获取的数据的最终一致性(删除那些通过bind的可是却没被实际调度或者事件丢失的pod), 借助这些其实一个最基础的工业级调度器的本地cache功能就实现了