kubernetes调度器以前已经分析过SchedulerCache、ScheduleAlgorithm、SchedulerExtender、Framework等核心数据结构,也分析了优选、调度、抢占流程的核心实现,本文是本系列目前打算的最后一章, 也是当前阶段对调度的学习的一个总结node
整个系列文档我已经已经更新到语雀上了地址是,谢谢你们分享加微信一块儿交流 https://www.yuque.com/baxiaoshi/tyado3/git
Binder负责将调度器的调度结果,传递给apiserver,即将一个pod绑定到选择出来的node节点github
在scheduler/factory中会构建一个默认的binder算法
func getBinderFunc(client clientset.Interface, extenders []algorithm.SchedulerExtender) func(pod *v1.Pod) Binder { defaultBinder := &binder{client} return func(pod *v1.Pod) Binder { for _, extender := range extenders { if extender.IsBinder() && extender.IsInterested(pod) { return extender } } return defaultBinder } }
binder接口和简单只须要调用apiserver的pod的bind接口便可完成绑定操做api
// Implement Binder interface var _ Binder = &binder{} // Bind just does a POST binding RPC. func (b *binder) Bind(binding *v1.Binding) error { klog.V(3).Infof("Attempting to bind %v to %v", binding.Name, binding.Target.Name) return b.Client.CoreV1().Pods(binding.Namespace).Bind(binding) }
执行绑定的操做位于Scheudler.bind接口,在调用Framework.RunBindPlugins后,只有当返回的状态不是成功,而是SKIP的时候,才执行bind操做,真的不知道是怎么想的,后续若是加入对应的bind插件,也须要返回SKIP,理解不了大神的思惟缓存
bindStatus := sched.Framework.RunBindPlugins(ctx, state, assumed, targetNode) var err error if !bindStatus.IsSuccess() { if bindStatus.Code() == framework.Skip { // 若是全部的插件都skip了菜容许将pod绑定到apiserver err = sched.GetBinder(assumed).Bind(&v1.Binding{ ObjectMeta: metav1.ObjectMeta{Namespace: assumed.Namespace, Name: assumed.Name, UID: assumed.UID}, Target: v1.ObjectReference{ Kind: "Node", Name: targetNode, }, }) } else { err = fmt.Errorf("Bind failure, code: %d: %v", bindStatus.Code(), bindStatus.Message()) } }
调度器的参数的初始化已经都放到defaultSchedulerOptions中了,后续应该更多的都会采用改种方式,避免散落在构建参数的各个阶段微信
var defaultSchedulerOptions = schedulerOptions{ schedulerName: v1.DefaultSchedulerName, schedulerAlgorithmSource: schedulerapi.SchedulerAlgorithmSource{ Provider: defaultAlgorithmSourceProviderName(), }, hardPodAffinitySymmetricWeight: v1.DefaultHardPodAffinitySymmetricWeight, disablePreemption: false, percentageOfNodesToScore: schedulerapi.DefaultPercentageOfNodesToScore, bindTimeoutSeconds: BindTimeoutSeconds, podInitialBackoffSeconds: int64(internalqueue.DefaultPodInitialBackoffDuration.Seconds()), podMaxBackoffSeconds: int64(internalqueue.DefaultPodMaxBackoffDuration.Seconds()), }
插件工厂注册表的初始化分为两个部分in tree和out of tree即当前版本自带的和用户自定义的两部分数据结构
// 首先进行当前版本的插件注册表的注册 registry := frameworkplugins.NewInTreeRegistry(&frameworkplugins.RegistryArgs{ VolumeBinder: volumeBinder, }) // 加载用户自定义的插件注册表 if err := registry.Merge(options.frameworkOutOfTreeRegistry); err != nil { return nil, err }
绑定事件回调主要是经过AddAllEventHandlers主要是将各类资源数据经过SchedulerCache放入本地缓存中,同时针对未调度的pod(!assignedPod即没有绑定Node的pod)加入到调度队列中架构
func AddAllEventHandlers( sched *Scheduler, schedulerName string, informerFactory informers.SharedInformerFactory, podInformer coreinformers.PodInformer, ) {
当资源发生变化的时候,好比service、volume等就会对unschedulableQ中的以前调度失败的pod进行重试,选择将其转移到activeQ或者backoffQ中并发
func (p *PriorityQueue) MoveAllToActiveOrBackoffQueue(event string) { p.lock.Lock() defer p.lock.Unlock() unschedulablePods := make([]*framework.PodInfo, 0, len(p.unschedulableQ.podInfoMap)) // 获取全部unschedulable的pod for _, pInfo := range p.unschedulableQ.podInfoMap { unschedulablePods = append(unschedulablePods, pInfo) } // 将unschedulable的pod转移到backoffQ队列或者activeQ队列中 p.movePodsToActiveOrBackoffQueue(unschedulablePods, event) // 修改迁移调度器请求周期, 在失败的时候会进行比较pod的moveRequestCycle是否>=schedulingCycle p.moveRequestCycle = p.schedulingCycle p.cond.Broadcast() }
最后则会启动调度器,其核心流程是在scheduleOne中
func (sched *Scheduler) Run(ctx context.Context) { // 首先会进行同步缓存 if !cache.WaitForCacheSync(ctx.Done(), sched.scheduledPodsHasSynced) { return } // 启动调度队列的后台定时任务 sched.SchedulingQueue.Run() // 启动调度流程 wait.UntilWithContext(ctx, sched.scheduleOne, 0) sched.SchedulingQueue.Close() }
获取等待调度的pod则直接经过NextPod拉进行,其实内部就是对schedulingQUeue.pop的封装
// 从队列中获取等待调度的pod podInfo := sched.NextPod() // pod could be nil when schedulerQueue is closed if podInfo == nil || podInfo.Pod == nil { return }
func MakeNextPodFunc(queue SchedulingQueue) func() *framework.PodInfo { return func() *framework.PodInfo { podInfo, err := queue.Pop() if err == nil { klog.V(4).Infof("About to try and schedule pod %v/%v", podInfo.Pod.Namespace, podInfo.Pod.Name) return podInfo } klog.Errorf("Error while retrieving next pod from scheduling queue: %v", err) return nil } }
skipPodSchedule即检查当前 pod是否能够进行跳过,其中一个是pod已经被删除,另一个就是pod已经被提议调度到某个节点,此时若是只是版本的更新,即除了ResourceVersion、Annotations、NodeName三个字段其他的都不曾变化,就不须要进行重复的调度
if sched.skipPodSchedule(pod) { return }
检测提议pod重复调度算法, 若是相等则不进行任何操做
f := func(pod *v1.Pod) *v1.Pod { p := pod.DeepCopy() p.ResourceVersion = "" p.Spec.NodeName = "" // Annotations must be excluded for the reasons described in // https://github.com/kubernetes/kubernetes/issues/52914. p.Annotations = nil return p } assumedPodCopy, podCopy := f(assumedPod), f(pod) // 若是pod的信息没有发生变动则不须要进行更新 if !reflect.DeepEqual(assumedPodCopy, podCopy) { return false } return true
生成CycleState和context, 其中CycleState用于进行调度器周期上线文数据传递共享,而context则负责统一的退出协调管理
// 构建CycleState和context state := framework.NewCycleState() state.SetRecordPluginMetrics(rand.Intn(100) < pluginMetricsSamplePercent) schedulingCycleCtx, cancel := context.WithCancel(ctx) defer cancel()
调度流程中底层依赖的数据结构ScheduleAlgorithm内部实现以前的分析中已经详细说过,这里会省略一些诸如volume bind、framework阶段钩子的调用
正常调度只须要调度ScheduleAlgorithm来进行调度,具体实现细节能够看以前的文章
scheduleResult, err := sched.Algorithm.Schedule(schedulingCycleCtx, state, pod)
err = sched.assume(assumedPod, scheduleResult.SuggestedHost)
若是一个Pod被提议存储到某个节点,则会先将其加入到SchedulerCache中,同时从SchedulingQueue中移除,避免重复调度
func (sched *Scheduler) assume(assumed *v1.Pod, host string) error { assumed.Spec.NodeName = host // 存储到SchedulerCache中这样下个调度周期中,pod会占用对应node的资源 if err := sched.SchedulerCache.AssumePod(assumed); err != nil { klog.Errorf("scheduler cache AssumePod failed: %v", err) return err } // if "assumed" is a nominated pod, we should remove it from internal cache // 从调度队列中移除pod if sched.SchedulingQueue != nil { sched.SchedulingQueue.DeleteNominatedPodIfExists(assumed) } return nil }
bind阶段与调度阶段是并行的关系,当执行bind的时候,会启动一个goroutine来单独执行bind操做, 省略关于framework、extender相关的hook调用
在绑定流程中若是发现以前的Volumes未所有绑定,则会先进行volumes绑定操做
if !allBound { err := sched.bindVolumes(assumedPod)
绑定操做主要是位于scheduler.bind,会进行最终的节点绑定
err := sched.bind(bindingCycleCtx, assumedPod, scheduleResult.SuggestedHost, state)
执行以前说的bind绑定操做,这里是真正操纵apiserver发生pod与node绑定请求的地方
bindStatus := sched.Framework.RunBindPlugins(ctx, state, assumed, targetNode) var err error if !bindStatus.IsSuccess() { if bindStatus.Code() == framework.Skip { // 若是全部的插件都skip了才容许将pod绑定到apiserver err = sched.GetBinder(assumed).Bind(&v1.Binding{ ObjectMeta: metav1.ObjectMeta{Namespace: assumed.Namespace, Name: assumed.Name, UID: assumed.UID}, Target: v1.ObjectReference{ Kind: "Node", Name: targetNode, }, }) } else { err = fmt.Errorf("Bind failure, code: %d: %v", bindStatus.Code(), bindStatus.Message()) } }
会调用SchedulerCache里面提议节点的过时时间,若是超过指定的过时时间,则会进行移除操做,释放node资源
if finErr := sched.SchedulerCache.FinishBinding(assumed); finErr != nil { klog.Errorf("scheduler cache FinishBinding failed: %v", finErr) }
若是在以前正常调度失败的时候,首先会发一个在recordSchedulingFailure中调用sched.Error来将失败的pod转移到backoffQ或者unschedulableQ队列中
sched.recordSchedulingFailure(podInfo.DeepCopy(), err, v1.PodReasonUnschedulable, err.Error())
若是是预选失败的,而且当前调度器容许抢占功能,则会进行抢占调度处理即sched.preempt
if fitError, ok := err.(*core.FitError); ok { // 若是是预选失败则进行 if sched.DisablePreemption { klog.V(3).Infof("Pod priority feature is not enabled or preemption is disabled by scheduler configuration." + " No preemption is performed.") } else { preemptionStartTime := time.Now() // 抢占调度 sched.preempt(schedulingCycleCtx, state, fwk, pod, fitError) metrics.PreemptionAttempts.Inc() metrics.SchedulingAlgorithmPreemptionEvaluationDuration.Observe(metrics.SinceInSeconds(preemptionStartTime)) metrics.DeprecatedSchedulingDuration.WithLabelValues(metrics.PreemptionEvaluation).Observe(metrics.SinceInSeconds(preemptionStartTime)) }
首先经过apiserver获取当前须要执行抢占的pod的最新Pod信息
preemptor, err := sched.podPreemptor.getUpdatedPod(preemptor) if err != nil { klog.Errorf("Error getting the updated preemptor pod object: %v", err) return "", err }
经过Preempt筛选要进行抢占操做的node节点、待驱逐的pod、待驱逐的提议的pod
node, victims, nominatedPodsToClear, err := sched.Algorithm.Preempt(ctx, state, preemptor, scheduleErr) if err != nil { klog.Errorf("Error preempting victims to make room for %v/%v: %v", preemptor.Namespace, preemptor.Name, err) return "", err }
若是节点抢占一个pod成功,则会更新队列中的抢占节点的提议节点信息,这样在下个调度周期中,就可使用该信息
sched.SchedulingQueue.UpdateNominatedPodForNode(preemptor, nodeName)
这里会直接调用apiserver中节点的提议节点信息,为何要这样作呢?由于当前pod已经抢占了node上部分的节点信息,可是在被抢占的pod彻底从节点上删除以前的这段时间,该pod调度依然会失败,可是此时不能继续调用抢占流程了,由于你已经执行了抢占,此时只须要等待对应节点上的node都删除,则再词继续尝试调度
err = sched.podPreemptor.setNominatedNodeName(preemptor, nodeName)
删除被驱逐节点直接调用apiserver进行操做,若是此时发现当前pod还在等待插件的Allow操做,则直接进行Reject
for _, victim := range victims { // 调用apiserver进行删除pod if err := sched.podPreemptor.deletePod(victim); err != nil { klog.Errorf("Error preempting pod %v/%v: %v", victim.Namespace, victim.Name, err) return "", err } // If the victim is a WaitingPod, send a reject message to the PermitPlugin if waitingPod := fwk.GetWaitingPod(victim.UID); waitingPod != nil { waitingPod.Reject("preempted") } sched.Recorder.Eventf(victim, preemptor, v1.EventTypeNormal, "Preempted", "Preempting", "Preempted by %v/%v on node %v", preemptor.Namespace, preemptor.Name, nodeName) }
针对那些已经被提议调度到当前node的pod,会将其node设置为空,从新进行调度选择
for _, p := range nominatedPodsToClear { // 清理这些提议的pod rErr := sched.podPreemptor.removeNominatedNodeName(p) if rErr != nil { klog.Errorf("Cannot remove 'NominatedPod' field of pod: %v", rErr) // We do not return as this error is not critical. } }
为了不太多的线交叉,这里我只给出了大的核心的流程,同时针对SchedulerExtender和Framework我作了简化,多个阶段其实都有调用,可是我只在最下面画出了数据结构和调用, 这张图基本上包含了大多数的关键的数据结构以及数据流向,但愿能给想学习调度器的朋友一些帮助
调度器代码的阅读从开始到如今,应该已经有快一个月的时间了,读到如今也算是对调度器的核心流程和关键的数据结构有一点了解,固然不少具体的调度算法,目前也并无去细看,由于初衷其实只是想了解下调度方面的架构设计与关键数据结构
源码阅读的过程当中我想最大的问题,可能就是关于一些数据结构和算法的设计的理解,固然我目前也都是本身的臆测做者的设计初衷,好在我是作运维开发的不少场景上其实还蛮容易理解的,好比服务打散、调度队列的Pod转移、并发意图等等,后续若是有人阅读有不同的理解,欢迎交流,指正小弟的一些错误理解
调度器目前应该仍然在开发中,目前已经吧优选阶段移入到Framework,后续的预选应该也在计划中,其次针对流程上的设计应该也在变更,好比不少说的nodeTree也在修改中,调度器的构建也更加工程化,反而比以前更好理解了,因此有兴趣阅读的,不必定要选择老的版本,新的版本可能更容易一些
调度器将来的优化点我感受除了在调度流程和算法管理Framework的演进,更多的优化仍是在预选阶段,即如何选择选择出最合适 node节点,该流程的优化应该主要分为两个部分:新Pod的预选和旧Pod的预选,即针对已知和未知的预选优化
针对已知的优化,一般能够经过保存更多的数据,以空间来换时间的设计来进行更多状态的保存加速预选 针对未知的优化,若是不考虑批处理任务,则其实针对未知的优化是个伪命题,由于在实际场景中,你一把不可能获取同时上线1000个新的服务,可是你能够同时调度10000个pod,那这些pod在以前的调度流程中,其实能够保存更多的状态数据,来加速预选,可是更多的数据状态保存则对当前的调度系统的不少设计可能都须要进行变动,估计应该须要等到整个调度器的流程和插件固化以后再考虑吧
好吧就胡说到这里吧,明天要开始新的模块的学习,也但愿能交到更多的朋友,我会把这个系列的全部文章整理程pdf,毕竟微信公共号的阅读体验是真很差
> 微信号:baxiaoshi2020 > 关注公告号阅读更多源码分析文章
> 更多文章关注 www.sreguide.com > 本文由博客一文多发平台 OpenWrite 发布