驱逐管理是kubelet自我保护的一种机制,当资源缺少的时候,经过阈值和策略主动驱逐部分的Pod从而保持系统的资源水位在一个合理的范围内,自己的策略官方文档上已经写的很详细,今天只是看下其在代码上的一些设计node
驱逐整体流程主要是经过获取当前使用资源的统计信息和当前活跃的Pod容器列表,而后结合资源阈值来决策是否须要进行Pod的驱逐算法
在驱逐管理器中基于观察者模式实现对特殊资源驱逐管理的支持,好比内存资源,在某些状况下即时当前的资源水位没有达到阈值,也可能会触发oom killer这个时候,对应的ThresholdNotifier也会尝试进行状态同步,从而肯定是否要主动的驱逐一些节点, 目前这个特性应该还没开, 同理若是这个资源有专属的阈值监测,则在后续驱逐Pod的时候,若是是由该资源触发的驱逐,则优先级会被下降api
驱逐管理器内部有不少的数据结构,这里咱们只关注其总体实现的关键数据结构微信
kubelet主要是根据当前系统中的资源来进行驱逐决策的,而信号就表示某种具体能够触发驱逐的资源,好比可用内存,在驱逐决策的时候,会根据统计信息来进行对应资源的计算数据结构
type Signal string const ( // 省略其余资源 SignalMemoryAvailable Signal = "memory.available" SignalNodeFsAvailable Signal = "nodefs.available" )
信号表示某种资源,而Threshold则表示某类资源的阈值app
type Threshold struct { Signal Signal Operator ThresholdOperator Value ThresholdValue GracePeriod time.Duration MinReclaim *ThresholdValue }
资源准备阶段,主要是为后面的决策作准备,首先要获取当前全部活跃的pod信息(后面从中挑选能够驱逐的Pod), 而后还要获取当前的统计信息、阈值ide
thresholds := m.config.Thresholds // 获取Pod活跃信息 activePods := podFunc() updateStats := true // 获取统计信息 summary, err := m.summaryProvider.Get(updateStats)
针对以前注册的专有资源的阈值则会尝试通知进行检测,源码分析
if m.clock.Since(m.thresholdsLastUpdated) > notifierRefreshInterval { m.thresholdsLastUpdated = m.clock.Now() for _, notifier := range m.thresholdNotifiers { // 更新summary通知 if err := notifier.UpdateThreshold(summary); err != nil { klog.Warningf("eviction manager: failed to update %s: %v", notifier.Description(), err) } } }
首先第一步会经过统计信息来获取当前已经观测的信号的真实使用资源ui
observations, statsFunc := makeSignalObservations(summary)
func makeSignalObservations(summary *statsapi.Summary) (signalObservations, statsFunc) { statsFunc := cachedStatsFunc(summary.Pods) result := signalObservations{} if memory := summary.Node.Memory; memory != nil && memory.AvailableBytes != nil && memory.WorkingSetBytes != nil { result[evictionapi.SignalMemoryAvailable] = signalObservation{ available: resource.NewQuantity(int64(*memory.AvailableBytes), resource.BinarySI), capacity: resource.NewQuantity(int64(*memory.AvailableBytes+*memory.WorkingSetBytes), resource.BinarySI), time: memory.Time, } } // 省略大部分信息 return result, statsFunc }
经过当前的观测状态和以前的全部配置的阈值配置,来进行获取那些被处罚的阈值, 这里最后一个false表示当前这是第一次检测,先不要使用强制最小回收debug
thresholds = thresholdsMet(thresholds, observations, false)
阈值检测主要是经过观测的状态里面对应资源的容量和阈值,肯定对应的容量信息,而后与实际的资源统计信息进行对比,若是超过则代表当前阈值达到,其中enforceMinReclaim则是否强制最小回收,只须要在实际容量里面加上强制最小回收那部分资源,就实现了
func thresholdsMet(thresholds []evictionapi.Threshold, observations signalObservations, enforceMinReclaim bool) []evictionapi.Threshold { results := []evictionapi.Threshold{} for i := range thresholds { threshold := thresholds[i] // 获取当前阈值信号的观测状态 observed, found := observations[threshold.Signal] if !found { klog.Warningf("eviction manager: no observation found for eviction signal %v", threshold.Signal) continue } // 肯定是否达到阈值 thresholdMet := false // 计算 quantity := evictionapi.GetThresholdQuantity(threshold.Value, observed.capacity) // 若是指定了enforceMinReclaim,则比较相对于值-minreclaim if enforceMinReclaim && threshold.MinReclaim != nil { // 强制最小回收,其实就是在以前阈值的基础上,在加上最小回收的资源 quantity.Add(*evictionapi.GetThresholdQuantity(*threshold.MinReclaim, observed.capacity)) } thresholdResult := quantity.Cmp(*observed.available) switch threshold.Operator { case evictionapi.OpLessThan: thresholdMet = thresholdResult > 0 } if thresholdMet { results = append(results, threshold) } } return results }
每次检测完毕后,evictionManager都会讲本轮全部触发的阈值保存在自身的thresholdsMet字段中,在下一轮会检测该资源是否获得知足,即经过以前的阈值和当前的观测状态来进行对比,从而发现那些依旧未被知足的阈值,合并到本轮的阈值中
if len(m.thresholdsMet) > 0 { // 没有被解决的thresholdMet, 进行合并 thresholdsNotYetResolved := thresholdsMet(m.thresholdsMet, observations, true) thresholds = mergeThresholds(thresholds, thresholdsNotYetResolved) }
在检测到有阈值被触发的时候,会根据阈值的信号类型来进行筛选,若是是对应的资源的类型有压力,则会在kubelet的准入控制在pod建立的时候,可能会不容许Pod的建立
nodeConditions := nodeConditions(thresholds) if len(nodeConditions) > 0 { klog.V(3).Infof("eviction manager: node conditions - observed: %v", nodeConditions) } nodeConditionsLastObservedAt := nodeConditionsLastObservedAt(nodeConditions, m.nodeConditionsLastObservedAt, now) nodeConditions = nodeConditionsObservedSince(nodeConditionsLastObservedAt, m.config.PressureTransitionPeriod, now) if len(nodeConditions) > 0 { klog.V(3).Infof("eviction manager: node conditions - transition period not met: %v", nodeConditions) }
// update internal state m.Lock() m.nodeConditions = nodeConditions m.thresholdsFirstObservedAt = thresholdsFirstObservedAt m.nodeConditionsLastObservedAt = nodeConditionsLastObservedAt m.thresholdsMet = thresholds // determine the set of thresholds whose stats have been updated since the last sync thresholds = thresholdsUpdatedStats(thresholds, observations, m.lastObservations) debugLogThresholdsWithObservation("thresholds - updated stats", thresholds, observations) m.lastObservations = observations m.Unlock()
针对本地临时存储资源的计算,主要是计算emptyDIr、临时Pod存储、容器日志几个本地临时存储资源的统计,进行pod的驱逐
if utilfeature.DefaultFeatureGate.Enabled(features.LocalStorageCapacityIsolation) { if evictedPods := m.localStorageEviction(summary, activePods); len(evictedPods) > 0 { return evictedPods } }
首先会尝试找到那些配置了最小化资源的进行抢占
thresholdToReclaim, resourceToReclaim, foundAny := getReclaimableThreshold(thresholds) if !foundAny { return nil }
根据对应的资源类型和当前的活跃的pod来进行优先级的排序
// rank the thresholds by eviction priority sort.Sort(byEvictionPriority(thresholds)) rank, ok := m.signalToRankFunc[thresholdToReclaim.Signal] if !ok { klog.Errorf("eviction manager: no ranking function for signal %s", thresholdToReclaim.Signal) return nil } // the only candidates viable for eviction are those pods that had anything running. if len(activePods) == 0 { klog.Errorf("eviction manager: eviction thresholds have been met, but no pods are active to evict") return nil } // 对指定资源的要逐出的正在运行的pod进行排序 rank(activePods, statsFunc)
每次至少会尝试驱逐一个Pod节点
for i := range activePods { pod := activePods[i] gracePeriodOverride := int64(0) if !isHardEvictionThreshold(thresholdToReclaim) { gracePeriodOverride = m.config.MaxPodGracePeriodSeconds } message, annotations := evictionMessage(resourceToReclaim, pod, statsFunc) // 驱逐pod if m.evictPod(pod, gracePeriodOverride, message, annotations) { metrics.Evictions.WithLabelValues(string(thresholdToReclaim.Signal)).Inc() return []*v1.Pod{pod} } }
优先级排序主要从三个方面进行排序:是否超量、Pod优先级、超出容量的大小
根据容器的时机使用量和对应Resource里面Request请求的资源,来对返回true或,
func exceedMemoryRequests(stats statsFunc) cmpFunc { return func(p1, p2 *v1.Pod) int { p1Stats, p1Found := stats(p1) p2Stats, p2Found := stats(p2) if !p1Found || !p2Found { return cmpBool(!p1Found, !p2Found) } p1Memory := memoryUsage(p1Stats.Memory) p2Memory := memoryUsage(p2Stats.Memory) p1ExceedsRequests := p1Memory.Cmp(v1resource.GetResourceRequestQuantity(p1, v1.ResourceMemory)) == 1 p2ExceedsRequests := p2Memory.Cmp(v1resource.GetResourceRequestQuantity(p2, v1.ResourceMemory)) == 1 // prioritize evicting the pod which exceeds its requests return cmpBool(p1ExceedsRequests, p2ExceedsRequests) } }
计算时机使用内存使用超量对比
func memory(stats statsFunc) cmpFunc { return func(p1, p2 *v1.Pod) int { p1Stats, p1Found := stats(p1) p2Stats, p2Found := stats(p2) if !p1Found || !p2Found { // prioritize evicting the pod for which no stats were found return cmpBool(!p1Found, !p2Found) } // adjust p1, p2 usage relative to the request (if any) p1Memory := memoryUsage(p1Stats.Memory) p1Request := v1resource.GetResourceRequestQuantity(p1, v1.ResourceMemory) p1Memory.Sub(p1Request) p2Memory := memoryUsage(p2Stats.Memory) p2Request := v1resource.GetResourceRequestQuantity(p2, v1.ResourceMemory) p2Memory.Sub(p2Request) // prioritize evicting the pod which has the larger consumption of memory return p2Memory.Cmp(*p1Memory) } }
func priority(p1, p2 *v1.Pod) int { priority1 := pod.GetPodPriority(p1) priority2 := pod.GetPodPriority(p2) if priority1 == priority2 { return 0 } if priority1 > priority2 { return 1 } return -1 }
没有写内存的memoryThresholdNotifier的实现,留着下篇,里面会有一些unix本地通信的实现,感受还蛮好玩的,下篇再见
k8s源码阅读电子书地址: https://www.yuque.com/baxiaoshi/tyado3
> 微信号:baxiaoshi2020 > 关注公告号阅读更多源码分析文章
> 更多文章关注 www.sreguide.com > 本文由博客一文多发平台 OpenWrite 发布