在 Kubernetes 中,调度是指将 Pod 放置到合适的 Node 上,而后对应 Node 上的 kubelet 才可以运行这些 Pod。K8s scheduler 就是用来调度 pod 的一个组件。node
本文主要是经过源码了解调度器的部分工做流程。算法
Based on Kubernetes v1.19.11.api
K8s scheduler 主要的数据结构是:缓存
相关的代码流程主要分为两个部分:数据结构
cmd/kube-scheduler
,这里是咱们调度器的起始处,主要是读取配置,初始化并启动调度器。pkg/scheduler
,这里是调度器的核心代码。// pkg/scheduler/scheduler.go // Scheduler watches for new unscheduled pods. It attempts to find // nodes that they fit on and writes bindings back to the api server. type Scheduler struct { // It is expected that changes made via SchedulerCache will be observed // by NodeLister and Algorithm. SchedulerCache internalcache.Cache Algorithm core.ScheduleAlgorithm // NextPod should be a function that blocks until the next pod // is available. We don't use a channel for this, because scheduling // a pod may take some amount of time and we don't want pods to get // stale while they sit in a channel. NextPod func() *framework.QueuedPodInfo // Error is called if there is an error. It is passed the pod in // question, and the error Error func(*framework.QueuedPodInfo, error) // Close this to shut down the scheduler. StopEverything <-chan struct{} // SchedulingQueue holds pods to be scheduled SchedulingQueue internalqueue.SchedulingQueue // Profiles are the scheduling profiles. Profiles profile.Map scheduledPodsHasSynced func() bool client clientset.Interface }
SchedulerCache
,保存了调度所需的 podStates 和 nodeInfos。Algorithm
,会使用该对象的 Schedule
方法来运行调度逻辑。SchedulingQueue
,调度队列。Profiles
,调度器配置。Interfaceapp
// pkg/scheduler/internal/queue/scheduling_queue.go // SchedulingQueue is an interface for a queue to store pods waiting to be scheduled. // The interface follows a pattern similar to cache.FIFO and cache.Heap and // makes it easy to use those data structures as a SchedulingQueue. type SchedulingQueue interface { framework.PodNominator Add(pod *v1.Pod) error // AddUnschedulableIfNotPresent adds an unschedulable pod back to scheduling queue. // The podSchedulingCycle represents the current scheduling cycle number which can be // returned by calling SchedulingCycle(). AddUnschedulableIfNotPresent(pod *framework.QueuedPodInfo, podSchedulingCycle int64) error // SchedulingCycle returns the current number of scheduling cycle which is // cached by scheduling queue. Normally, incrementing this number whenever // a pod is popped (e.g. called Pop()) is enough. SchedulingCycle() int64 // Pop removes the head of the queue and returns it. It blocks if the // queue is empty and waits until a new item is added to the queue. Pop() (*framework.QueuedPodInfo, error) Update(oldPod, newPod *v1.Pod) error Delete(pod *v1.Pod) error MoveAllToActiveOrBackoffQueue(event string) AssignedPodAdded(pod *v1.Pod) AssignedPodUpdated(pod *v1.Pod) PendingPods() []*v1.Pod // Close closes the SchedulingQueue so that the goroutine which is // waiting to pop items can exit gracefully. Close() // NumUnschedulablePods returns the number of unschedulable pods exist in the SchedulingQueue. NumUnschedulablePods() int // Run starts the goroutines managing the queue. Run() }
Implementation框架
// PriorityQueue implements a scheduling queue. // The head of PriorityQueue is the highest priority pending pod. This structure // has three sub queues. One sub-queue holds pods that are being considered for // scheduling. This is called activeQ and is a Heap. Another queue holds // pods that are already tried and are determined to be unschedulable. The latter // is called unschedulableQ. The third queue holds pods that are moved from // unschedulable queues and will be moved to active queue when backoff are completed. type PriorityQueue struct { // PodNominator abstracts the operations to maintain nominated Pods. framework.PodNominator stop chan struct{} clock util.Clock // pod initial backoff duration. podInitialBackoffDuration time.Duration // pod maximum backoff duration. podMaxBackoffDuration time.Duration lock sync.RWMutex cond sync.Cond // activeQ is heap structure that scheduler actively looks at to find pods to // schedule. Head of heap is the highest priority pod. activeQ *heap.Heap // podBackoffQ is a heap ordered by backoff expiry. Pods which have completed backoff // are popped from this heap before the scheduler looks at activeQ podBackoffQ *heap.Heap // unschedulableQ holds pods that have been tried and determined unschedulable. unschedulableQ *UnschedulablePodsMap // schedulingCycle represents sequence number of scheduling cycle and is incremented // when a pod is popped. schedulingCycle int64 // moveRequestCycle caches the sequence number of scheduling cycle when we // received a move request. Unscheduable pods in and before this scheduling // cycle will be put back to activeQueue if we were trying to schedule them // when we received move request. moveRequestCycle int64 // closed indicates that the queue is closed. // It is mainly used to let Pop() exit its control loop while waiting for an item. closed bool }
backOffTime
来排序,backOffTime
受 podInitialBackoffDuration
以及 podMaxBackoffDuration
两个参数影响。最开始,scheduler 在 cmd/kube-scheduler/scheduler.go
使用 NewSchedulerCommand()
初始化命令并执行命令。less
// cmd/kube-scheduler/scheduler.go func main() { ... command := app.NewSchedulerCommand() ... if err := command.Execute(); err != nil { os.Exit(1) } }
NewSchedulerCommand()
会读取配置文件和参数,初始化调度命令,其中最主要的函数是 runCommand()
。async
func NewSchedulerCommand(registryOptions ...Option) *cobra.Command { ... cmd := &cobra.Command{ Use: "kube-scheduler", ... Run: func(cmd *cobra.Command, args []string) { if err := runCommand(cmd, opts, registryOptions...); err != nil { fmt.Fprintf(os.Stderr, "%v\n", err) os.Exit(1) } }, ... } ... return cmd }
runCommand
主要分为两个重要步骤:ide
Setup
:读取配置文件以及参数,初始化调度器。这里的配置文件包括 Profiles 配置等。Run
:运行调度器所需的组件,例如健康检查服务,Informer 等。而后使用 Setup
获得的调度器运行调度的主流程。func runCommand(cmd *cobra.Command, opts *options.Options, registryOptions ...Option) error { ... cc, sched, err := Setup(ctx, opts, registryOptions...) if err != nil { return err } return Run(ctx, cc, sched) }
Setup
会根据配置文件和参数建立 scheduler。这里我的以为最主要的是 Profiles,里面定义了调度器的名字,以及 scheduling framework 的插件配置。还有一些能够用来调优的参数,例如 PercentageOfNodesToScore
, PodInitialBackoffSeconds
, PodMaxBackoffSeconds
等。
而且 scheduler.New()
中会有一个 addAllEventHandlers(sched, informerFactory, podInformer)
函数,启动全部资源对象的事件监听,来根据状况调用对应的回调函数,这些回调函数同时也会影响调度队列的运行过程。
func Setup(ctx context.Context, opts *options.Options, outOfTreeRegistryOptions ...Option) (*schedulerserverconfig.CompletedConfig, *scheduler.Scheduler, error) { ... // Create the scheduler. sched, err := scheduler.New(cc.Client, cc.InformerFactory, cc.PodInformer, recorderFactory, ctx.Done(), scheduler.WithProfiles(cc.ComponentConfig.Profiles...), scheduler.WithAlgorithmSource(cc.ComponentConfig.AlgorithmSource), scheduler.WithPercentageOfNodesToScore(cc.ComponentConfig.PercentageOfNodesToScore), scheduler.WithFrameworkOutOfTreeRegistry(outOfTreeRegistry), scheduler.WithPodMaxBackoffSeconds(cc.ComponentConfig.PodMaxBackoffSeconds), scheduler.WithPodInitialBackoffSeconds(cc.ComponentConfig.PodInitialBackoffSeconds), scheduler.WithExtenders(cc.ComponentConfig.Extenders...), ) if err != nil { return nil, nil, err } return &cc, sched, nil }
Run
主要是启动一些组件,而后调用 sched.Run(ctx)
进行调度的主流程。
func Run(ctx context.Context, cc *schedulerserverconfig.CompletedConfig, sched *scheduler.Scheduler) error { ... // Prepare the event broadcaster. cc.EventBroadcaster.StartRecordingToSink(ctx.Done()) // Setup healthz checks. ... // Start up the healthz server. ... // Start all informers. go cc.PodInformer.Informer().Run(ctx.Done()) cc.InformerFactory.Start(ctx.Done()) // Wait for all caches to sync before scheduling. cc.InformerFactory.WaitForCacheSync(ctx.Done()) // If leader election is enabled, runCommand via LeaderElector until done and exit. // Leader election ... // Leader election is disabled, so runCommand inline until done. sched.Run(ctx) return fmt.Errorf("finished without leader elect") }
Run
会启动 scheduling queue,并不断调用 sched.scheduleOne()
进行调度。
// Run begins watching and scheduling. It waits for cache to be synced, then starts scheduling and blocked until the context is done. func (sched *Scheduler) Run(ctx context.Context) { if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) { return } sched.SchedulingQueue.Run() wait.UntilWithContext(ctx, sched.scheduleOne, 0) sched.SchedulingQueue.Close() }
// Run starts the goroutine to pump from podBackoffQ to activeQ func (p *PriorityQueue) Run() { go wait.Until(p.flushBackoffQCompleted, 1.0*time.Second, p.stop) go wait.Until(p.flushUnschedulableQLeftover, 30*time.Second, p.stop) }
调度队列的运行逻辑:
podBackoffQ
是否有 pod 能够放入 activeQ
中。检查的逻辑是判断 backOffTime
是否已经到期。unschedulableQ
是否有 pod 能够放入 activeQ
中。在介绍 scheduleOne
以前,看这张 pod 调度流程图能有助于咱们理清整个过程。同时这也是 k8s v1.15 开始支持的 Scheduling Framework 的 Plugin 扩展点。
// scheduleOne does the entire scheduling workflow for a single pod. It is serialized on the scheduling algorithm's host fitting. func (sched *Scheduler) scheduleOne(ctx context.Context) { podInfo := sched.NextPod() ... pod := podInfo.Pod prof, err := sched.profileForPod(pod) ... // Synchronously attempt to find a fit for the pod. start := time.Now() state := framework.NewCycleState() ... scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, prof, state, pod) ... // Tell the cache to assume that a pod now is running on a given node, even though it hasn't been bound yet. // This allows us to keep scheduling without waiting on binding to occur. assumedPodInfo := podInfo.DeepCopy() assumedPod := assumedPodInfo.Pod // assume modifies `assumedPod` by setting NodeName=scheduleResult.SuggestedHost err = sched.assume(assumedPod, scheduleResult.SuggestedHost) ... // Run the Reserve method of reserve plugins. if sts := prof.RunReservePluginsReserve(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost); !sts.IsSuccess() { ... } // Run "permit" plugins. runPermitStatus := prof.RunPermitPlugins(schedulingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) ... // bind the pod to its host asynchronously (we can do this b/c of the assumption step above). go func() { bindingCycleCtx, cancel := context.WithCancel(ctx) waitOnPermitStatus := prof.WaitOnPermit(bindingCycleCtx, assumedPod) if !waitOnPermitStatus.IsSuccess() { ... return } // Run "prebind" plugins. preBindStatus := prof.RunPreBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) if !preBindStatus.IsSuccess() { ... return } err := sched.bind(bindingCycleCtx, prof, assumedPod, scheduleResult.SuggestedHost, state) if err != nil { ... } else { // Run "postbind" plugins. prof.RunPostBindPlugins(bindingCycleCtx, state, assumedPod, scheduleResult.SuggestedHost) } }() }
ScheduleOne
是调度器的主流程,主要包括如下几步:
sched.NextPod()
拿到下一个须要调度的 pod。后面会对这个过程进行更详细的介绍。sched.profileForPod(pod)
,根据 pod 中的 schedulerName 拿到针对该 pod 调度的 Profiles。这些 Profiles 就包括了调度插件的配置等。sched.Algorithm.Schedule()
。此处包括好几个步骤,其中 PreFilter
, Filter
被称为 Predicate,是对节点进行过滤,这里面考虑了节点资源,Pod Affinity,以及 Node Volumn 等状况。而 PreScore
, Score
, Nomalize Score
又被称为 Priorities,是对节点进行优选打分,这里会获得一个适合当前 Pod 分配上去的 Node。Reserve
操做,将调度结果缓存。当后面的调度流程执行失败,会进行 Unreserve
进行数据回滚。Permit
操做,这里是用户自定义的插件,可使 Pod 进行 allow(容许 Pod 经过 Permit 阶段)、reject(Pod 调度失败)和 wait(可设置超时时间)这三种操做。对于 Gang Scheduling (一批 pod 同时建立成功或同时建立失败),能够在 Permit
对 Pod 进行控制。WaitOnPermit
操做,这里会阻塞判断 Pod 是否 Permit,直到 Pod Permit 状态为 allow 或者 reject 再往下继续运行。PreBind
, Bind
, PostBind
操做。这里会调用 k8s apiserver 提供的接口 b.handle.ClientSet().CoreV1().Pods(binding.Namespace).Bind(ctx, binding, metav1.CreateOptions{})
,将待调度的 Pod 与选中的节点进行绑定,可是可能会绑定失败,此时会作 Unreserve
操做,将节点上面 Pod 的资源解除预留,而后从新放置到失败队列中。当 Pod 与 Node 绑定成功后,Node 上面的 kubelet 会 watch 到对应的 event,而后会在节点上建立 Pod,包括建立容器 storage、network 等。等全部的资源都准备完成,kubelet 会把 Pod 状态更新为Running。
调度的时候,须要获取一个调度的 pod,即 sched.NextPod()
,其中调用了 SchedulingQueue 的 Pop()
方法。
当 activeQ
中没有元素,会经过 p.cond.Wait()
阻塞,直到 podBackoffQ
或者 unschedulableQ
将元素加入 activeQ
并经过 cond.Broadcast()
来唤醒。
// Pop removes the head of the active queue and returns it. It blocks if the // activeQ is empty and waits until a new item is added to the queue. It // increments scheduling cycle when a pod is popped. func (p *PriorityQueue) Pop() (*framework.QueuedPodInfo, error) { p.lock.Lock() defer p.lock.Unlock() for p.activeQ.Len() == 0 { // When the queue is empty, invocation of Pop() is blocked until new item is enqueued. // When Close() is called, the p.closed is set and the condition is broadcast, // which causes this loop to continue and return from the Pop(). if p.closed { return nil, fmt.Errorf(queueClosed) } p.cond.Wait() } obj, err := p.activeQ.Pop() if err != nil { return nil, err } pInfo := obj.(*framework.QueuedPodInfo) pInfo.Attempts++ p.schedulingCycle++ return pInfo, err }
当 pod 加入 activeQ
后,还会从 unschedulableQ
以及 podBackoffQ
中删除对应 pod 的信息,并使用 cond.Broadcast()
来唤醒阻塞的 Pop。
// Add adds a pod to the active queue. It should be called only when a new pod // is added so there is no chance the pod is already in active/unschedulable/backoff queues func (p *PriorityQueue) Add(pod *v1.Pod) error { p.lock.Lock() defer p.lock.Unlock() pInfo := p.newQueuedPodInfo(pod) if err := p.activeQ.Add(pInfo); err != nil { klog.Errorf("Error adding pod %v to the scheduling queue: %v", nsNameForPod(pod), err) return err } if p.unschedulableQ.get(pod) != nil { klog.Errorf("Error: pod %v is already in the unschedulable queue.", nsNameForPod(pod)) p.unschedulableQ.delete(pod) } // Delete pod from backoffQ if it is backing off if err := p.podBackoffQ.Delete(pInfo); err == nil { klog.Errorf("Error: pod %v is already in the podBackoff queue.", nsNameForPod(pod)) } metrics.SchedulerQueueIncomingPods.WithLabelValues("active", PodAdd).Inc() p.PodNominator.AddNominatedPod(pod, "") p.cond.Broadcast() return nil }
当 pod 调度失败时,会调用 sched.Error()
,其中调用了 p.AddUnschedulableIfNotPresent()
.
决定 pod 调度失败时进入 podBackoffQ
仍是 unschedulableQ
:若是 moveRequestCycle
大于 podSchedulingCycle
,则进入 podBackoffQ
,不然进入 unschedulableQ
.
// AddUnschedulableIfNotPresent inserts a pod that cannot be scheduled into // the queue, unless it is already in the queue. Normally, PriorityQueue puts // unschedulable pods in `unschedulableQ`. But if there has been a recent move // request, then the pod is put in `podBackoffQ`. func (p *PriorityQueue) AddUnschedulableIfNotPresent(pInfo *framework.QueuedPodInfo, podSchedulingCycle int64) error { ... // If a move request has been received, move it to the BackoffQ, otherwise move // it to unschedulableQ. if p.moveRequestCycle >= podSchedulingCycle { if err := p.podBackoffQ.Add(pInfo); err != nil { return fmt.Errorf("error adding pod %v to the backoff queue: %v", pod.Name, err) } } else { p.unschedulableQ.addOrUpdate(pInfo) } ... }
什么时候 moveRequestCycle >= podSchedulingCycle
:
unschedulableQ
中以前由于资源不知足需求的 pod 放入 activeQ
或者 podBackoffQ
,及时进行调度。flushUnschedulableQLeftover
,尝试调度 unschedulableQ
中的 pod。这二者都会调用 movePodsToActiveOrBackoffQueue
函数,并将 moveRequestCycle
设为 p.schedulingCycle
.
func (p *PriorityQueue) movePodsToActiveOrBackoffQueue(podInfoList []*framework.QueuedPodInfo, event string) { ... p.moveRequestCycle = p.schedulingCycle p.cond.Broadcast() }
加入 podBackoffQ
有两种状况会让 pod 加入 podBackoffQ:
moveRequestCycle >= podSchedulingCycle
,pod 就会加入到 podBackoffQ 中。movePodsToActiveOrBackoffQueue
将 unschedulableQ 的 pod 转移到 podBackoffQ 或者 activeQ 中。转移到 podBackoffQ 的条件是 p.isPodBackingoff(pInfo)
,即 pod 仍然处于 backoff 状态。退出 podBackoffQ
调度器会定时让 pod 从 podBackoffQ 转移到 activeQ 中。
在 sched.SchedulingQueue.Run
中运行的 flushBackoffQCompleted
cronjob 会每隔 1s 按照优先级(优先级是按照 backoffTime 排序)依次将知足 backoffTime 条件的 pod 从 podBackoffQ 转移到 activeQ 中,直到遇到一个不知足 backoffTime 条件的 pod。
加入 unschedulableQ
只有一种状况会让 pod 加入 unschedulableQ,那就是调度失败。若是调度失败,而且集群资源没有发生变动,即 moveRequestCycle < podSchedulingCycle
,那么 pod 就会加入到 unschedulableQ 中。
退出 unschedulableQ
调度器会一样定时让 pod 从 unschedulableQ 转移到 podBackoffQ 或者 activeQ 中。
在 sched.SchedulingQueue.Run
中运行的 flushUnschedulableQLeftover
最终会调用 movePodsToActiveOrBackoffQueue
将 pod 分别加入到 podBackoffQ 或者 activeQ 中。
Kubernetes scheduler 是 kubernetes 中至关重要的组件,基本上各个云平台都会根据本身的业务模型和需求自定义调度器,例如 华为的 Volcano 计算框架。
经过这方面的学习,能在自定义调度器的开发中更加驾轻就熟。
Referencek8s source code
图解kubernetes调度器SchedulingQueue核心源码实现
深刻理解k8s调度器与调度框架核心源码
Kubernetes资源调度——scheduler