本节全部的代码基于1.13.4版本。node
以前在分析controller-manager中说到,controller对于每一个controller的控制格式基本一致,都是以start***Controller
的方式封装成一个独立的方法,NodeController也不例外。在1.13.4的版本中,Node的控制器分红了两种(很早以前的版本只有一种),分别是NodeIpamController与NodeLifecycleController。其中,NodeIpamController主要处理Node的IPAM地址相关,NodeLifecycleController处理Node的整个生命周期,本文主要分析NodeLifecycleController。
api
startNodeLifecycleController
方法开始它的生命周期的管理流程。主要关注两个方法:
NewNodeLifecycleController与
Run。NewNodeLifecycleController负责建立资源对象,Run负责启动,完成任务的执行。
NewNodeLifecycleController主要完成如下任务:
一、根据给定的配置构造Controller大结构体,完成部分参数的配置任务;
二、为podInformer
、nodeInformer
、leaseInformer
以及daemonSetInformer
配置相应的回调方法,包括AddFunc
、UpdateFunc
以及DeleteFunc
。这样,当相应的Node发生变化时,关联的controller可以及时监听到,并调用相应的处理方法;
三、返回构造完的结构体。
在配置的时候有几个须要注意的变量,后面会常常用到。 缓存
TaintNodeNotReady
和
TaintNodeUnreachable
污点的方式替换以前的直接驱逐Pod的方式,经过流控删除Pod。主要为了防止Pod在某一时间点忽然被大量驱逐;
Run方法主要包含如下方法,每一个方法都是以单独的goroutine运行:
一、go nc.taintManager.Run(stopCh)
:TaintManager,主要完成Pod的驱逐任务;
二、doNoScheduleTaintingPassWorker
:完成NoSchedule的污点更新任务;
三、doNoExecuteTaintingPass
、doEvictionPass
:完成NoExecute的污点更新任务;
四、monitorNodeHealth
:检查Node的状态,而且处理Node的增删改查等任务,同时也会处理Pod的驱逐工做。
代码以下网络
// Run starts an asynchronous loop that monitors the status of cluster nodes.
func (nc *Controller) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
klog.Infof("Starting node controller")
defer klog.Infof("Shutting down node controller")
if !controller.WaitForCacheSync("taint", stopCh, nc.leaseInformerSynced, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) {
return
}
if nc.runTaintManager {
go nc.taintManager.Run(stopCh)
}
if nc.taintNodeByCondition {
// Close node update queue to cleanup go routine.
defer nc.nodeUpdateQueue.ShutDown()
// Start workers to update NoSchedule taint for nodes.
for i := 0; i < scheduler.UpdateWorkerSize; i++ {
// Thanks to "workqueue", each worker just need to get item from queue, because
// the item is flagged when got from queue: if new event come, the new item will
// be re-queued until "Done", so no more than one worker handle the same item and
// no event missed.
go wait.Until(nc.doNoScheduleTaintingPassWorker, time.Second, stopCh)
}
}
if nc.useTaintBasedEvictions {
// Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
// taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
go wait.Until(nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod, stopCh)
} else {
// Managing eviction of nodes:
// When we delete pods off a node, if the node was not empty at the time we then
// queue an eviction watcher. If we hit an error, retry deletion.
go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, stopCh)
}
// Incorporate the results of node health signal pushed from kubelet to master.
go wait.Until(func() {
if err := nc.monitorNodeHealth(); err != nil {
klog.Errorf("Error monitoring node health: %v", err)
}
}, nc.nodeMonitorPeriod, stopCh)
<-stopCh
}
复制代码
NodeLifecycleController的执行过程主要就是各个goroutine对应的任务,一一分析。app
TaintManager经过Run方法开始启动。在Run方法内,主要作了几个工做:
一、初始化nodeUpdateChannels
和podUpdateChannels
,大小为8个channel,后面能够并行处理;
二、启动两个goroutine,分别监听nodeUpdateQueue和podUpdateQueue的消息;
三、并行启动8个工做任务,处理监听到的nodeUpdate和podUpdate的消息。
async
// Run starts NoExecuteTaintManager which will run in loop until `stopCh` is closed.
func (tc *NoExecuteTaintManager) Run(stopCh <-chan struct{}) {
klog.V(0).Infof("Starting NoExecuteTaintManager")
for i := 0; i < UpdateWorkerSize; i++ {
tc.nodeUpdateChannels = append(tc.nodeUpdateChannels, make(chan nodeUpdateItem, NodeUpdateChannelSize))
tc.podUpdateChannels = append(tc.podUpdateChannels, make(chan podUpdateItem, podUpdateChannelSize))
}
// Functions that are responsible for taking work items out of the workqueues and putting them
// into channels.
go func(stopCh <-chan struct{}) {
for {
item, shutdown := tc.nodeUpdateQueue.Get()
if shutdown {
break
}
nodeUpdate := item.(nodeUpdateItem)
hash := hash(nodeUpdate.nodeName, UpdateWorkerSize)
select {
case <-stopCh:
tc.nodeUpdateQueue.Done(item)
return
case tc.nodeUpdateChannels[hash] <- nodeUpdate:
// tc.nodeUpdateQueue.Done is called by the nodeUpdateChannels worker
}
}
}(stopCh)
go func(stopCh <-chan struct{}) {
for {
item, shutdown := tc.podUpdateQueue.Get()
if shutdown {
break
}
podUpdate := item.(podUpdateItem)
hash := hash(podUpdate.nodeName, UpdateWorkerSize)
select {
case <-stopCh:
tc.podUpdateQueue.Done(item)
return
case tc.podUpdateChannels[hash] <- podUpdate:
// tc.podUpdateQueue.Done is called by the podUpdateChannels worker
}
}
}(stopCh)
wg := sync.WaitGroup{}
wg.Add(UpdateWorkerSize)
for i := 0; i < UpdateWorkerSize; i++ {
go tc.worker(i, wg.Done, stopCh)
}
wg.Wait()
}
复制代码
在并行启动的work任务中,优先处理nodeUpdate的事件,等到nodeUpdate处理完成以后,再去处理podUpdate。处理nodeUpdate的方法对应handleNodeUpdate
,podUpdate对应handlePodUpdate
。
handleNodeUpdate
主要的做用就是经过监听到的nodeName获取node信息,经过node信息获取该node上对应的taints。而后对该node上全部的pod,依次执行processPodOnNode
方法。方法以下:ide
func (tc *NoExecuteTaintManager) handleNodeUpdate(nodeUpdate nodeUpdateItem) {
node, err := tc.getNode(nodeUpdate.nodeName)
if err != nil {
if apierrors.IsNotFound(err) {
// Delete
klog.V(4).Infof("Noticed node deletion: %#v", nodeUpdate.nodeName)
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
delete(tc.taintedNodes, nodeUpdate.nodeName)
return
}
utilruntime.HandleError(fmt.Errorf("cannot get node %s: %v", nodeUpdate.nodeName, err))
return
}
// Create or Update
klog.V(4).Infof("Noticed node update: %#v", nodeUpdate)
taints := getNoExecuteTaints(node.Spec.Taints)
func() {
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
klog.V(4).Infof("Updating known taints on node %v: %v", node.Name, taints)
if len(taints) == 0 {
delete(tc.taintedNodes, node.Name)
} else {
tc.taintedNodes[node.Name] = taints
}
}()
pods, err := getPodsAssignedToNode(tc.client, node.Name)
if err != nil {
klog.Errorf(err.Error())
return
}
if len(pods) == 0 {
return
}
// Short circuit, to make this controller a bit faster.
if len(taints) == 0 {
klog.V(4).Infof("All taints were removed from the Node %v. Cancelling all evictions...", node.Name)
for i := range pods {
tc.cancelWorkWithEvent(types.NamespacedName{Namespace: pods[i].Namespace, Name: pods[i].Name})
}
return
}
now := time.Now()
for i := range pods {
pod := &pods[i]
podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
tc.processPodOnNode(podNamespacedName, node.Name, pod.Spec.Tolerations, taints, now)
}
}
复制代码
handlePodUpdate
经过获取到单一的pod信息与node信息,也是最终执行processPodOnNode
方法。方法以下:oop
func (tc *NoExecuteTaintManager) handlePodUpdate(podUpdate podUpdateItem) {
pod, err := tc.getPod(podUpdate.podName, podUpdate.podNamespace)
if err != nil {
if apierrors.IsNotFound(err) {
// Delete
podNamespacedName := types.NamespacedName{Namespace: podUpdate.podNamespace, Name: podUpdate.podName}
klog.V(4).Infof("Noticed pod deletion: %#v", podNamespacedName)
tc.cancelWorkWithEvent(podNamespacedName)
return
}
utilruntime.HandleError(fmt.Errorf("could not get pod %s/%s: %v", podUpdate.podName, podUpdate.podNamespace, err))
return
}
// We key the workqueue and shard workers by nodeName. If we don't match the current state we should not be the one processing the current object.
if pod.Spec.NodeName != podUpdate.nodeName {
return
}
// Create or Update
podNamespacedName := types.NamespacedName{Namespace: pod.Namespace, Name: pod.Name}
klog.V(4).Infof("Noticed pod update: %#v", podNamespacedName)
nodeName := pod.Spec.NodeName
if nodeName == "" {
return
}
taints, ok := func() ([]v1.Taint, bool) {
tc.taintedNodesLock.Lock()
defer tc.taintedNodesLock.Unlock()
taints, ok := tc.taintedNodes[nodeName]
return taints, ok
}()
// It's possible that Node was deleted, or Taints were removed before, which triggered
// eviction cancelling if it was needed.
if !ok {
return
}
tc.processPodOnNode(podNamespacedName, nodeName, pod.Spec.Tolerations, taints, time.Now())
}
复制代码
processPodOnNode
方法主要将须要删除的Pod按照预约好的格式添加到taintEvictionQueue
,该queue内的任务都是设置好定时任务时间的,在相应的时间内调用deletePodHandler
方法去删除pod,该方法位于pkg/controller/nodelifecycle/scheduler/taint_manager.go
下。方法以下:ui
func deletePodHandler(c clientset.Interface, emitEventFunc func(types.NamespacedName)) func(args *WorkArgs) error {
return func(args *WorkArgs) error {
ns := args.NamespacedName.Namespace
name := args.NamespacedName.Name
klog.V(0).Infof("NoExecuteTaintManager is deleting Pod: %v", args.NamespacedName.String())
if emitEventFunc != nil {
emitEventFunc(args.NamespacedName)
}
var err error
for i := 0; i < retries; i++ {
err = c.CoreV1().Pods(ns).Delete(name, &metav1.DeleteOptions{})
if err == nil {
break
}
time.Sleep(10 * time.Millisecond)
}
return err
}
}
复制代码
因此,TaintManager的主要做用就是将须要驱逐的Pod配置好定时删除的任务,而后从相应的Node上一一删除。this
当开启taintNodeByCondition特性的时候,则会调用doNoScheduleTaintingPassWorker
去对Node作NoSchedule的污点更新。调用的是doNoScheduleTaintingPass
方法。方法以下:
func (nc *Controller) doNoScheduleTaintingPass(nodeName string) error {
node, err := nc.nodeLister.Get(nodeName)
if err != nil {
// If node not found, just ignore it.
if apierrors.IsNotFound(err) {
return nil
}
return err
}
// Map node's condition to Taints.
var taints []v1.Taint
for _, condition := range node.Status.Conditions {
if taintMap, found := nodeConditionToTaintKeyStatusMap[condition.Type]; found {
if taintKey, found := taintMap[condition.Status]; found {
taints = append(taints, v1.Taint{
Key: taintKey,
Effect: v1.TaintEffectNoSchedule,
})
}
}
}
if node.Spec.Unschedulable {
// If unschedulable, append related taint.
taints = append(taints, v1.Taint{
Key: schedulerapi.TaintNodeUnschedulable,
Effect: v1.TaintEffectNoSchedule,
})
}
// Get exist taints of node.
nodeTaints := taintutils.TaintSetFilter(node.Spec.Taints, func(t *v1.Taint) bool {
// only NoSchedule taints are candidates to be compared with "taints" later
if t.Effect != v1.TaintEffectNoSchedule {
return false
}
// Find unschedulable taint of node.
if t.Key == schedulerapi.TaintNodeUnschedulable {
return true
}
// Find node condition taints of node.
_, found := taintKeyToNodeConditionMap[t.Key]
return found
})
taintsToAdd, taintsToDel := taintutils.TaintSetDiff(taints, nodeTaints)
// If nothing to add not delete, return true directly.
if len(taintsToAdd) == 0 && len(taintsToDel) == 0 {
return nil
}
if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, taintsToAdd, taintsToDel, node) {
return fmt.Errorf("failed to swap taints of node %+v", node)
}
return nil
}
复制代码
doNoScheduleTaintingPass
主要作了如下工做:
一、根据nodeName获取node信息;
二、根据node.Status.Conditions字段,判断node是否须要添加NoSchedule污点,判断的标准以下:
SwapNodeControllerTaint
对Node进行污点的状态更新。
doNoExecuteTaintingPass
和doEvictionPass
二者只会执行其一。
doNoExecuteTaintingPass
方法为Node添加
NoExecute污点;而
doEvictionPass
则是直接判断哪些Pod须要驱逐,直接去作删除工做。
doNoExecuteTaintingPass
方法中,经过获取
zoneNoExecuteTainter内的数据对Node状态进行判断,若是须要则添加上NoExecute污点,并调用
SwapNodeControllerTaint
方法更新该Node上的污点。zoneNoExecuteTainter的信息是经过
monitorNodeHealth
方法获取到的,后面再分析。
doNoExecuteTaintingPass
的方法以下:
doEvictionPass
则是直接经过获取
zonePodEvictor内的数据,判断哪些Pod须要被驱除,则直接调用Pod的DELETE接口,完成Pod的驱逐任务。zonePodEvictor的信息也是经过
monitorNodeHealth
方法获取到的。
doEvictionPass
方法以下:
doNoExecuteTaintingPass
只是对Node打上污点,而
doEvictionPass
则是完成了最终的删除工做。
doEvictionPass
的这种方式会致使某一个时间段内,大量的Pod须要被删除,会产生很大的流量;而
doNoExecuteTaintingPass
经过给Node打上污点,让TaintManager去作最终的Pod删除工做,TaintManager的删除任务是分时间段定时执行的,因此不会产生这种大流量的问题。所以建议开启这个特性,在kube-controller-manager的启动参数加上
--feature-gates=TaintBasedEvictions=true
便可。
前面几个goroutine的任务主要围绕着Taint来展开,而monitorNodeHealth
则是定时更新Node的信息,并产生数据的来源。
monitorNodeHealth
的主要任务能够分为如下步骤:
一、获取全部的Node信息,按照哪些是新增的、哪些是须要删除的以及哪些是须要从新规划的返回节点的相应信息;
tryUpdateNodeHealth
方法;
handleDisruption
方法。
首先经过List接口获取全部的Node信息,经过classifyNodes
完成Node的划分。classifyNodes
规则划分很简单,比对knownNodeSet
和allNodes
,能够理解为knownNodeSet
为上一次的数据,allNodes
为新的数据,则:
一、若是在allNodes
存在,在knownNodeSet
不存在,为新增的Node;
二、若是在knownNodeSet
存在,在allNodes
不存在,为删除的Node;
三、若是在knownNodeSet
和allNodes
都存在,可是没有zone states,为newZoneRepresentatives的Node。每一个Node都要归属于一个Zone。
在步骤1完成节点的划分以后,步骤2针对每种类型的节点作相应的处理操做。
一、待新增的Node,将其加入到knownNodeSet内缓存,经过addPodEvictorForNewZone
为其归属一个Zone,经过useTaintBasedEvictions的开关控制,判断是标记Node为Reachable或是取消Pod的驱逐工做。总之就是表示这个Node能够开始正常使用了;
二、待删除的Node,将其从knownNodeSet内删除;
三、未划分Zone的Node,将其添加到Zone缓存中去。
对获取到的全部的Node,调用PollImmediate
方法,每20ms,重试5次,去更新Node的状态,主要调用了tryUpdateNodeHealth
方法。tryUpdateNodeHealth
方法值中,主要关注observedReadyCondition
和currentReadyCondition
。能够理解为observedReadyCondition
表示上一次的Node状态,currentReadyCondition
表示当前的Node状态。如下的多重if-else都是根据这两个值来操做的。
整个大的语句从currentReadyCondition
不为空开始,分如下几种状况:
一、observedReadyCondition
的值为False,即Node未Ready,给Node打上node.kubernetes.io/not-ready:NoExecute的污点或是直接驱逐Node上的Pod;
二、observedReadyCondition
的值为Unknown,给Node打上node.kubernetes.io/unreachable:NoExecute的污点或是直接驱除Node上的Pod;
三、observedReadyCondition
的值为True,表示Node是正常工做的状态,标记Node为Reachable或是中止驱逐Pod的操做;
四、currentReadyCondition
不为True而observedReadyCondition
为True,表示Node处于Not Ready的状态,标记Node为Not Ready,并更新Node上的Pod状态;
五、currentReadyCondition
不为True而且配置了cloudprovider,作删除Node的操做。
整个大的循环主要的任务就是对Node的状态进行判断,作Node的污点标记或是驱逐相关操做。zoneNoExecuteTainter
和zonePodEvictor
两个数据集的信息都是在此作相应的更新的。
最终调用handleDisruption
作网络中断的一些相关处理操做。
中断主要有如下几种状态:
handleDisruption
中,经过
allAreFullyDisrupted
和
allWasFullyDisrupted
标记如今的zone状态和以前缓存的zone状态,分别表示最新的结果和上一次的结果信息。而后作三种处理操做:
fullDisruption
,即全中断,表示全部Node都处于Not Ready的状态,此时恢复正常的驱逐速率,并中止作驱逐操做;
partialDisruption
,即部分中断,表示部分Node处于Not Ready的状态,此时设置用户定义的驱逐速率;
normal
,恢复到默认的驱逐速率。