Framework是kubernetes扩展的第二种实现,相比SchedulerExtender基于远程独立Service的扩展,Framework核心则实现了一种基于扩展点的本地化的规范流程管理机制node
Framework的设计在官方文档中已经有明确的描述,当前尚未Stable, 本文目前基于1.18版本聊一聊除了官方描述外的实现的上的一些细节git
目前官方主要是围绕着以前的预选和优选阶段进行扩展,提供了更多的扩展点,其中每一个扩展点都是一类插件,咱们能够根据咱们的须要在对应的阶段来进行扩展插件的编写,实现调度加强github
在当前版本中优先级插件已经抽取到了framework中,后续应该会继续将预选插件来进行抽取,这块应该还得一段时间才能稳定编程
在Framework的实现中,每一个插件扩展阶段调用都会传递context和CycleState两个对象,其中context与咱们在大多数go编程中的用法相似,这里主要是用于多阶段并行处理的时候的统一退出操做,而CycleState则存储当前这一个调度周期内的全部数据,这是一个并发安全的结构,内部包含一个读写锁数组
Permit是在进行Bind绑定操做以前进行的一项操做,其主要设计目标是在进行bind以前,进行最后一道决策,即当前pod是否准许进行最终的Bind操做,具备一票否决权,若是里面的插件拒绝,则对应的pod会从新进行调度安全
Framework的核心数据结构简单的来讲分为三部分:插件集合(针对每一个扩展阶段都会有本身的集合)、元数据获取接口(集群和快照数据的获取)、等待Pod集合微信
插件集合中会根据不一样的插件类型,来进行分类保存, 其中还有一个插件的优先级存储map,目前只有在优选阶段使用,后续可能会加入预选的优先级数据结构
pluginNameToWeightMap map[string]int queueSortPlugins []QueueSortPlugin preFilterPlugins []PreFilterPlugin filterPlugins []FilterPlugin postFilterPlugins []PostFilterPlugin scorePlugins []ScorePlugin reservePlugins []ReservePlugin preBindPlugins []PreBindPlugin bindPlugins []BindPlugin postBindPlugins []PostBindPlugin unreservePlugins []UnreservePlugin permitPlugins []PermitPlugin
主要是集群中的一些数据获取接口的实现,主要是为了实现FrameworkHandle, 该接口主要是提供一些数据的获取的接口和集群操做的接口给插件使用闭包
clientSet clientset.Interface informerFactory informers.SharedInformerFactory volumeBinder *volumebinder.VolumeBinder snapshotSharedLister schedulerlisters.SharedLister
等待pod集合主要是存储在Permit阶段进行等待的pod,若是在等待周期中pod被删除,则会直接拒绝并发
waitingPods *waitingPodsMap
经过插件工厂来存储全部注册的插件工厂,而后经过插件工厂构建具体的插件
registry Registry
工厂函数即传入对应的参数,构建一个Plugin,其中FrameworkHandle主要是用于获取快照和集群的其余数据
type PluginFactory = func(configuration *runtime.Unknown, f FrameworkHandle) (Plugin, error)
在go里面大多数插件工厂的实现都是经过map来实现这里也是同样,对外暴露Register和UnRegister接口
type Registry map[string]PluginFactory // Register adds a new plugin to the registry. If a plugin with the same name // exists, it returns an error. func (r Registry) Register(name string, factory PluginFactory) error { if _, ok := r[name]; ok { return fmt.Errorf("a plugin named %v already exists", name) } r[name] = factory return nil } // Unregister removes an existing plugin from the registry. If no plugin with // the provided name exists, it returns an error. func (r Registry) Unregister(name string) error { if _, ok := r[name]; !ok { return fmt.Errorf("no plugin named %v exists", name) } delete(r, name) return nil } // Merge merges the provided registry to the current one. func (r Registry) Merge(in Registry) error { for name, factory := range in { if err := r.Register(name, factory); err != nil { return err } } return nil }
这里以preFilterPlugins为例展现整个流程的注册
Plugins在配置阶段进行构造,其会保存当前framework中注册的全部的插件,其经过PluginSet保存对应的容许和禁用的插件
type Plugins struct { // QueueSort is a list of plugins that should be invoked when sorting pods in the scheduling queue. QueueSort *PluginSet // PreFilter is a list of plugins that should be invoked at "PreFilter" extension point of the scheduling framework. PreFilter *PluginSet // Filter is a list of plugins that should be invoked when filtering out nodes that cannot run the Pod. Filter *PluginSet // PostFilter is a list of plugins that are invoked after filtering out infeasible nodes. PostFilter *PluginSet // Score is a list of plugins that should be invoked when ranking nodes that have passed the filtering phase. Score *PluginSet // Reserve is a list of plugins invoked when reserving a node to run the pod. Reserve *PluginSet // Permit is a list of plugins that control binding of a Pod. These plugins can prevent or delay binding of a Pod. Permit *PluginSet // PreBind is a list of plugins that should be invoked before a pod is bound. PreBind *PluginSet // Bind is a list of plugins that should be invoked at "Bind" extension point of the scheduling framework. // The scheduler call these plugins in order. Scheduler skips the rest of these plugins as soon as one returns success. Bind *PluginSet // PostBind is a list of plugins that should be invoked after a pod is successfully bound. PostBind *PluginSet // Unreserve is a list of plugins invoked when a pod that was previously reserved is rejected in a later phase. Unreserve *PluginSet }
该方法主要是为了实现对应插件类型和framework中保存对应插件类型数组的映射, 好比Prefilter与其关联的preFilterPlugins切片,string(插件类型)->[]PreFilterPlugin(&reflect.SliceHeader切片头)
func (f *framework) getExtensionPoints(plugins *config.Plugins) []extensionPoint { return []extensionPoint{ {plugins.PreFilter, &f.preFilterPlugins}, {plugins.Filter, &f.filterPlugins}, {plugins.Reserve, &f.reservePlugins}, {plugins.PostFilter, &f.postFilterPlugins}, {plugins.Score, &f.scorePlugins}, {plugins.PreBind, &f.preBindPlugins}, {plugins.Bind, &f.bindPlugins}, {plugins.PostBind, &f.postBindPlugins}, {plugins.Unreserve, &f.unreservePlugins}, {plugins.Permit, &f.permitPlugins}, {plugins.QueueSort, &f.queueSortPlugins}, } }
其会遍历全部的上面的映射,可是此处不会根据类型注册到对应的切片中,而是全部的注册到gpMAp中
func (f *framework) pluginsNeeded(plugins *config.Plugins) map[string]config.Plugin { pgMap := make(map[string]config.Plugin) if plugins == nil { return pgMap } // 构建匿名函数,利用闭包来修改pgMap保存全部容许的插件集合 find := func(pgs *config.PluginSet) { if pgs == nil { return } for _, pg := range pgs.Enabled { // 遍历全部容许的插件集合 pgMap[pg.Name] = pg // 保存到map中 } } // 遍历上面的全部映射表 for _, e := range f.getExtensionPoints(plugins) { find(e.plugins) } return pgMap }
会调用生成的插件工厂注册表,来经过每一个插件的Factory构建Plugin插件实例, 保存到pluginsMap中
pluginsMap := make(map[string]Plugin) for name, factory := range r { // pg即上面生成的pgMap,这里只会生成须要使用的插件 if _, ok := pg[name]; !ok { continue } p, err := factory(pluginConfig[name], f) if err != nil { return nil, fmt.Errorf("error initializing plugin %q: %v", name, err) } pluginsMap[name] = p // 进行权重保存 f.pluginNameToWeightMap[name] = int(pg[name].Weight) if f.pluginNameToWeightMap[name] == 0 { f.pluginNameToWeightMap[name] = 1 } // Checks totalPriority against MaxTotalScore to avoid overflow if int64(f.pluginNameToWeightMap[name])*MaxNodeScore > MaxTotalScore-totalPriority { return nil, fmt.Errorf("total score of Score plugins could overflow") } totalPriority += int64(f.pluginNameToWeightMap[name]) * MaxNodeScore }
这里主要是经过e.slicePtr利用反射,结合以前的构造的pluginsMap和反射来进行具体类型插件的注册
for _, e := range f.getExtensionPoints(plugins) { if err := updatePluginList(e.slicePtr, e.plugins, pluginsMap); err != nil { return nil, err } }
updatePluginList主要是经过反射来进行的,经过上面的getExtensionPoints获取的framework中对应的slice的地址,而后利用反射来进行插件的注册和合法性效验
func updatePluginList(pluginList interface{}, pluginSet *config.PluginSet, pluginsMap map[string]Plugin) error { if pluginSet == nil { return nil } // 首先经过Elem获取当前数组的类型 plugins := reflect.ValueOf(pluginList).Elem() // 经过数组类型来获取数组内部元素的类型 pluginType := plugins.Type().Elem() set := sets.NewString() for _, ep := range pluginSet.Enabled { pg, ok := pluginsMap[ep.Name] if !ok { return fmt.Errorf("%s %q does not exist", pluginType.Name(), ep.Name) } // 合法性检查:若是发现当前插件未实现当前接口,则报错 if !reflect.TypeOf(pg).Implements(pluginType) { return fmt.Errorf("plugin %q does not extend %s plugin", ep.Name, pluginType.Name()) } if set.Has(ep.Name) { return fmt.Errorf("plugin %q already registered as %q", ep.Name, pluginType.Name()) } set.Insert(ep.Name) // 追加插件到slice中,并保存指针指向 newPlugins := reflect.Append(plugins, reflect.ValueOf(pg)) plugins.Set(newPlugins) } return nil }
CycleState主要是负责调度流程中数据的保存和克隆,其对外暴露了读写锁接口,各扩展点插件能够根据需求独立进行加锁选择
CycleState实现并复杂主要保存StateData数据,只须要实现一个clone接口便可,CycleState里面的数据,能够被当前framework全部的插件进行数据增长和修改,里面会经过读写锁来保证线程安全,但并不会针对插件进行限制,即信任全部插件,能够任意进行增删
type CycleState struct { mx sync.RWMutex storage map[StateKey]StateData // if recordPluginMetrics is true, PluginExecutionDuration will be recorded for this cycle. recordPluginMetrics bool } // StateData is a generic type for arbitrary data stored in CycleState. type StateData interface { // Clone is an interface to make a copy of StateData. For performance reasons, // clone should make shallow copies for members (e.g., slices or maps) that are not // impacted by PreFilter's optional AddPod/RemovePod methods. Clone() StateData }
对外接口的实现,须要对应的插件主动选择进行加读锁或者加写锁,而后进行相关数据的读取和修改
func (c *CycleState) Read(key StateKey) (StateData, error) { if v, ok := c.storage[key]; ok { return v, nil } return nil, errors.New(NotFound) } // Write stores the given "val" in CycleState with the given "key". // This function is not thread safe. In multi-threaded code, lock should be // acquired first. func (c *CycleState) Write(key StateKey, val StateData) { c.storage[key] = val } // Delete deletes data with the given key from CycleState. // This function is not thread safe. In multi-threaded code, lock should be // acquired first. func (c *CycleState) Delete(key StateKey) { delete(c.storage, key) } // Lock acquires CycleState lock. func (c *CycleState) Lock() { c.mx.Lock() } // Unlock releases CycleState lock. func (c *CycleState) Unlock() { c.mx.Unlock() } // RLock acquires CycleState read lock. func (c *CycleState) RLock() { c.mx.RLock() } // RUnlock releases CycleState read lock. func (c *CycleState) RUnlock() { c.mx.RUnlock() }
waitingPodMap主要是存储Permit阶段插件设置的须要Wait等待的pod,即时通过以前的优选后,这里面的pod也可能会被某些插件给拒绝掉
waitingPodsMAp其内部经过pod的uid保存一个map映射,同时经过读写锁来进行数据保护
type waitingPodsMap struct { pods map[types.UID]WaitingPod mu sync.RWMutex }
waitingPod则是一个具体的pod的等待实例,其内部经过pendingPlugins保存插件的定义的 timer等待时间,对外经过chan *status来接受当前pod的状态,并经过读写锁来进行串行化
type waitingPod struct { pod *v1.Pod pendingPlugins map[string]*time.Timer s chan *Status mu sync.RWMutex }
会根据每一个plugin的wait等待时间构建N个timer, 若是任一的timer到期,则就拒绝
func newWaitingPod(pod *v1.Pod, pluginsMaxWaitTime map[string]time.Duration) *waitingPod { wp := &waitingPod{ pod: pod, s: make(chan *Status), } wp.pendingPlugins = make(map[string]*time.Timer, len(pluginsMaxWaitTime)) // The time.AfterFunc calls wp.Reject which iterates through pendingPlugins map. Acquire the // lock here so that time.AfterFunc can only execute after newWaitingPod finishes. wp.mu.Lock() defer wp.mu.Unlock() // 根据插件的等待时间来构建timer,若是有任一timer到期,还不曾有任何plugin Allow则会进行Rejectj㐇 for k, v := range pluginsMaxWaitTime { plugin, waitTime := k, v wp.pendingPlugins[plugin] = time.AfterFunc(waitTime, func() { msg := fmt.Sprintf("rejected due to timeout after waiting %v at plugin %v", waitTime, plugin) wp.Reject(msg) }) } return wp }
任一一个plugin的定时器到期,或者plugin主动发起reject操做,则都会暂停全部的定时器,并进行消息广播
func (w *waitingPod) Reject(msg string) bool { w.mu.RLock() defer w.mu.RUnlock() // 中止全部的timer for _, timer := range w.pendingPlugins { timer.Stop() } // 经过管道发送拒绝事件 select { case w.s <- NewStatus(Unschedulable, msg): return true default: return false } }
容许操做必须等待全部的plugin都Allow后,才能发送容许事件
func (w *waitingPod) Allow(pluginName string) bool { w.mu.Lock() defer w.mu.Unlock() if timer, exist := w.pendingPlugins[pluginName]; exist { // 中止当前plugin的定时器 timer.Stop() delete(w.pendingPlugins, pluginName) } // Only signal success status after all plugins have allowed if len(w.pendingPlugins) != 0 { return true } // 只有当全部的plugin都容许,才会发生成功容许事件 select { case w.s <- NewStatus(Success, ""): // 发送事件 return true default: return false } }
首先会遍历全部的插件,而后若是发现状态设置为Wait,则会根据插件的等待时间进行wait操做
func (f *framework) RunPermitPlugins(ctx context.Context, state *CycleState, pod *v1.Pod, nodeName string) (status *Status) { startTime := time.Now() defer func() { metrics.FrameworkExtensionPointDuration.WithLabelValues(permit, status.Code().String()).Observe(metrics.SinceInSeconds(startTime)) }() pluginsWaitTime := make(map[string]time.Duration) statusCode := Success for _, pl := range f.permitPlugins { status, timeout := f.runPermitPlugin(ctx, pl, state, pod, nodeName) if !status.IsSuccess() { if status.IsUnschedulable() { msg := fmt.Sprintf("rejected by %q at permit: %v", pl.Name(), status.Message()) klog.V(4).Infof(msg) return NewStatus(status.Code(), msg) } if status.Code() == Wait { // Not allowed to be greater than maxTimeout. if timeout > maxTimeout { timeout = maxTimeout } // 记录当前plugin的等待时间 pluginsWaitTime[pl.Name()] = timeout statusCode = Wait } else { msg := fmt.Sprintf("error while running %q permit plugin for pod %q: %v", pl.Name(), pod.Name, status.Message()) klog.Error(msg) return NewStatus(Error, msg) } } } // We now wait for the minimum duration if at least one plugin asked to // wait (and no plugin rejected the pod) if statusCode == Wait { startTime := time.Now() // 根据插件等待时间构建waitingPod w := newWaitingPod(pod, pluginsWaitTime) // 加入到waitingPods中 f.waitingPods.add(w) // 移除 defer f.waitingPods.remove(pod.UID) klog.V(4).Infof("waiting for pod %q at permit", pod.Name) // 等待状态消息 s := <-w.s metrics.PermitWaitDuration.WithLabelValues(s.Code().String()).Observe(metrics.SinceInSeconds(startTime)) if !s.IsSuccess() { if s.IsUnschedulable() { msg := fmt.Sprintf("pod %q rejected while waiting at permit: %v", pod.Name, s.Message()) klog.V(4).Infof(msg) return NewStatus(s.Code(), msg) } msg := fmt.Sprintf("error received while waiting at permit for pod %q: %v", pod.Name, s.Message()) klog.Error(msg) return NewStatus(Error, msg) } } return nil }
上面已经将插件进行注册,而且介绍了调度流程中数据的保存和等待机制的实现,其实剩下的就是每类插件执行调用的具体实现了,除了优选阶段,其实剩下的阶段,都是几乎没有什么逻辑处理了,而优选阶段就跟以前系列分享里面的优选阶段的设计相似,这里也不在进行赘述了
流程看起来都蛮简单的,注意这个地方有任一一个插件拒绝,则就会直接调度失败
func (f *framework) RunPreFilterPlugins(ctx context.Context, state *CycleState, pod *v1.Pod) (status *Status) { startTime := time.Now() defer func() { metrics.FrameworkExtensionPointDuration.WithLabelValues(preFilter, status.Code().String()).Observe(metrics.SinceInSeconds(startTime)) }() for _, pl := range f.preFilterPlugins { status = f.runPreFilterPlugin(ctx, pl, state, pod) if !status.IsSuccess() { if status.IsUnschedulable() { msg := fmt.Sprintf("rejected by %q at prefilter: %v", pl.Name(), status.Message()) klog.V(4).Infof(msg) return NewStatus(status.Code(), msg) } msg := fmt.Sprintf("error while running %q prefilter plugin for pod %q: %v", pl.Name(), pod.Name, status.Message()) klog.Error(msg) return NewStatus(Error, msg) } } return nil }
跟以前的相似,只不过会根据runAllFilters参数肯定是否要运行全部的插件,默认是不运行,由于已经失败了了嘛
unc (f *framework) RunFilterPlugins( ctx context.Context, state *CycleState, pod *v1.Pod, nodeInfo *schedulernodeinfo.NodeInfo, ) PluginToStatus { var firstFailedStatus *Status startTime := time.Now() defer func() { metrics.FrameworkExtensionPointDuration.WithLabelValues(filter, firstFailedStatus.Code().String()).Observe(metrics.SinceInSeconds(startTime)) }() statuses := make(PluginToStatus) for _, pl := range f.filterPlugins { pluginStatus := f.runFilterPlugin(ctx, pl, state, pod, nodeInfo) if len(statuses) == 0 { firstFailedStatus = pluginStatus } if !pluginStatus.IsSuccess() { if !pluginStatus.IsUnschedulable() { // Filter plugins are not supposed to return any status other than // Success or Unschedulable. firstFailedStatus = NewStatus(Error, fmt.Sprintf("running %q filter plugin for pod %q: %v", pl.Name(), pod.Name, pluginStatus.Message())) return map[string]*Status{pl.Name(): firstFailedStatus} } statuses[pl.Name()] = pluginStatus if !f.runAllFilters { // 不须要运行全部插件进行退出 return statuses } } } return statuses }
今天就到这里吧,调度器修改仍是蛮大的,可是能够预见的是,为了更多的调度插件可能都会集中到framework中,对kubernetes scheduler系列的学习,也算是告一段落了,做为一个kubernetes新手学习起来仍是有点费劲,还好调度器设计的跟其余模块的耦合性相对小一点
> 微信号:baxiaoshi2020 欢迎一块儿交流学习分享,有个小群欢迎大佬光临 > 我的博客: www.sreguide.com
> 微信号:baxiaoshi2020 > 关注公告号阅读更多源码分析文章
> 更多文章关注 www.sreguide.com > 本文由博客一文多发平台 OpenWrite 发布