图解kubernetes调度器预选设计实现学习

Scheduler中在进行node选举的时候会首先进行一轮预选流程,即从当前集群中选择一批node节点,本文主要分析k8s在预选流程上一些优秀的筛选设计思想,欢迎大佬们指正node

1. 基础设计

1.1 预选场景

预选顾名思义就是从当前集群中的全部的node中,选择出知足当前pod资源和亲和性等需求的node节点,如何在集群中快速选择这样的节点,是个复杂的问题算法

1.2 平均分布

平均分布主要是经过让一个分配索引来进行即只有当全部的node都在本轮分配周期内分配一次后,才开始从头进行分配,从而保证集群的平均分布api

1.3 预选中断

预选终端即在预选的过程当中若是发现node已经不能知足当前pod资源需求的时候,就进行中断预选流程,尝试下一个节点微信

1.4 并行筛选

在当前k8s版本中,默认会启动16个goroutine来进行并行的预选,从而提升性能,从而提升预选的性能app

1.5 局部最优解

预选流程须要从当前集群中选择一台符合要求的node随着集群规模的增加,若是每次遍历全部集群node则会必然致使性能的降低,因而经过局部最优解的方式,缩小筛选节点的数量ide

2. 源码分析

预选的核心流程是经过findNodesThatFit来完成,其返回预选结果供优选流程使用函数

2.1 取样逻辑

取样是经过当前集群中的node数量和默认的最小值来决定本次预选阶段须要获取的node节点数量源码分析

// 获取全部的节点数量,并经过计算百分比,获取本次选举选择的节点数量
		allNodes := int32(g.cache.NodeTree().NumNodes())
		// 肯定要查找node数量
		numNodesToFind := g.numFeasibleNodesToFind(allNodes)

2.2 取样算法

取样算法很简单从集群中获取指定百分比的节点默认是50%,若是50%的节点数量小于minFeasibleNodesToFind则按照minFeasibleNodesToFind(最小取样节点数量)来取样,性能

func (g *genericScheduler) numFeasibleNodesToFind(numAllNodes int32) (numNodes int32) {
	// 若是当前节点数量小于minFeasibleNodesToFind即小于100台node
    // 同理百分好比果大于100就是全量取样
    // 这两种状况都直接遍历整个集群中全部节点
    if numAllNodes < minFeasibleNodesToFind || g.percentageOfNodesToScore >= 100 {
		return numAllNodes
	}

	adaptivePercentage := g.percentageOfNodesToScore
	if adaptivePercentage <= 0 {
		adaptivePercentage = schedulerapi.DefaultPercentageOfNodesToScore - numAllNodes/125
		if adaptivePercentage < minFeasibleNodesPercentageToFind {
			adaptivePercentage = minFeasibleNodesPercentageToFind
		}
	}

    // 正常取样计算:好比numAllNodes为5000,而adaptivePercentage为50%
    // 则numNodes=50000*0.5/100=250
	numNodes = numAllNodes * adaptivePercentage / 100
	if numNodes < minFeasibleNodesToFind { // 若是小于最少取样则按照最少取样进行取样
		return minFeasibleNodesToFind
	}

	return numNodes
}

2.3 取样元数据准备

经过filtered来进行预选结果的存储,经过filteredLen来进行原子保护协做多个取样goroutine, 并经过predicateMetaProducer和当前的snapshot来进行元数据构建ui

filtered = make([]*v1.Node, numNodesToFind)
		errs := errors.MessageCountMap{}
		var (
			predicateResultLock sync.Mutex
			filteredLen         int32
		)

		ctx, cancel := context.WithCancel(context.Background())

		// We can use the same metadata producer for all nodes.
		meta := g.predicateMetaProducer(pod, g.nodeInfoSnapshot.NodeInfoMap)

2.4 经过channel协做并行取样

并行取样主要经过调用下面的函数来启动16个goroutine来进行并行取样,并经过ctx来协调退出

workqueue.ParallelizeUntil(ctx, 16, int(allNodes), checkNode)

image.png 经过channel来构建取样索引的管道,每一个worker会负责从channel获取的指定索引取样node的填充

func ParallelizeUntil(ctx context.Context, workers, pieces int, doWorkPiece DoWorkPieceFunc) {
	var stop <-chan struct{}
	if ctx != nil {
		stop = ctx.Done()
	}

    // 生成指定数量索引,worker经过索引来进行预选成功节点的存储
	toProcess := make(chan int, pieces)
	for i := 0; i < pieces; i++ {
		toProcess <- i
	}
	close(toProcess)

	if pieces < workers {
		workers = pieces
	}

	wg := sync.WaitGroup{}
	wg.Add(workers)
	for i := 0; i < workers; i++ {
        // 启动多个goroutine
		go func() {
			defer utilruntime.HandleCrash()
			defer wg.Done()
			for piece := range toProcess {
				select {
				case <-stop:
					return
				default:
                    //获取索引,后续会经过该索引来进行结果的存储
					doWorkPiece(piece)
				}
			}
		}()
	}
    // 等待退出
	wg.Wait()
}

2.5 取样并行函数

checkNode := func(i int) {
			// 获取一个节点
			nodeName := g.cache.NodeTree().Next()

            // 取样核心流程是经过podFitsOnNode来肯定
			fits, failedPredicates, status, err := g.podFitsOnNode(
				pluginContext,
				pod,
				meta,
				g.nodeInfoSnapshot.NodeInfoMap[nodeName],
				g.predicates, // 传递预选算法
				g.schedulingQueue,
				g.alwaysCheckAllPredicates,
			)
			if err != nil {
				predicateResultLock.Lock()
				errs[err.Error()]++
				predicateResultLock.Unlock()
				return
			}
			if fits {
				// 若是当前以及查找到的数量大于预选的数量,就退出
				length := atomic.AddInt32(&filteredLen, 1)
				if length > numNodesToFind {
					cancel()
					atomic.AddInt32(&filteredLen, -1)
				} else {
					filtered[length-1] = g.nodeInfoSnapshot.NodeInfoMap[nodeName].Node()
				}
			} else {
                // 进行错误状态的保存 
				predicateResultLock.Lock()
				if !status.IsSuccess() {
					filteredNodesStatuses[nodeName] = status
				}
				if len(failedPredicates) != 0 {
					failedPredicateMap[nodeName] = failedPredicates
				}
				predicateResultLock.Unlock()
			}
		}

2.6 面向将来的筛选

image.png 在kubernetes中通过调度器调度后的pod结果会放入到SchedulingQueue中进行暂存,这些pod将来可能会通过后续调度流程运行在提议的node上,也可能由于某些缘由致使最终没有运行,而预选流程为了减小后续由于调度冲突(好比pod之间的亲和性等问题,而且当前pod不能抢占这些pod),则会在进行预选的时候,将这部分pod考虑进去

若是在这些pod存在的状况下,node能够知足当前pod的筛选条件,则能够去除被提议的pod再进行筛选(若是这些提议的pod最终没有调度到node,则当前node也须要知足各类亲和性的需求)

2.6 取样核心设计

image.png 结合上面说的面向将来的筛选,经过两轮筛选在不管那些优先级高的pod是否被调度到当前node上,均可以知足pod的调度需求,在调度的流程中只须要获取以前注册的调度算法,完成预选检测,若是发现有条件不经过则不会进行第二轮筛选,继续选择下一个节点

func (g *genericScheduler) podFitsOnNode(
	pluginContext *framework.PluginContext,
	pod *v1.Pod,
	meta predicates.PredicateMetadata,
	info *schedulernodeinfo.NodeInfo,
	predicateFuncs map[string]predicates.FitPredicate,
	queue internalqueue.SchedulingQueue,
	alwaysCheckAllPredicates bool,
) (bool, []predicates.PredicateFailureReason, *framework.Status, error) {
	var failedPredicates []predicates.PredicateFailureReason
	var status *framework.Status

    // podsAdded主要用于标识当前是否有提议的pod若是没有提议的pod则就不须要再进行一轮筛选了
	podsAdded := false
	
	for i := 0; i < 2; i++ {
		metaToUse := meta
		nodeInfoToUse := info
		if i == 0 {
			// 首先获取那些提议的pod进行第一轮筛选, 若是第一轮筛选出错,则不会进行第二轮筛选
			podsAdded, metaToUse, nodeInfoToUse = addNominatedPods(pod, meta, info, queue)
		} else if !podsAdded || len(failedPredicates) != 0 {
            // 若是
			break
		}
		for _, predicateKey := range predicates.Ordering() {
			var (
				fit     bool
				reasons []predicates.PredicateFailureReason
				err     error
			)
			//TODO (yastij) : compute average predicate restrictiveness to export it as Prometheus metric
			if predicate, exist := predicateFuncs[predicateKey]; exist {
				// 预选算法计算
				fit, reasons, err = predicate(pod, metaToUse, nodeInfoToUse)
				if err != nil {
					return false, []predicates.PredicateFailureReason{}, nil, err
				}

				if !fit {
					// eCache is available and valid, and predicates result is unfit, record the fail reasons
					failedPredicates = append(failedPredicates, reasons...)
					// if alwaysCheckAllPredicates is false, short circuit all predicates when one predicate fails.
					if !alwaysCheckAllPredicates {
						klog.V(5).Infoln("since alwaysCheckAllPredicates has not been set, the predicate " +
							"evaluation is short circuited and there are chances " +
							"of other predicates failing as well.")
						break
					}
				}
			}
		}

		status = g.framework.RunFilterPlugins(pluginContext, pod, info.Node().Name)
		if !status.IsSuccess() && !status.IsUnschedulable() {
			return false, failedPredicates, status, status.AsError()
		}
	}

	return len(failedPredicates) == 0 && status.IsSuccess(), failedPredicates, status, nil
}

> 微信号:baxiaoshi2020 > 关注公告号阅读更多源码分析文章 21天大棚 > 更多文章关注 www.sreguide.com > 本文由博客一文多发平台 OpenWrite 发布

相关文章
相关标签/搜索