Kubelet从入门到放弃:识透CPU管理(下)

3、源码分析node

在介绍代码以前,zouyee先带各位看一看CPU manager的启动图(CPU manager属于Container Manager模块的子系统)
复制代码

对于上图的内容,zouyee总结流程以下:linux

一、在命令行启动部分,Kubelet中调用NewContainerManager构建ContainerManager

二、NewContainerManager函数调用topologymanager.NewManager构建拓扑管理器

三、NewContainerManager函数调用cpumanager.NewManager构建CPU管理器

四、拓扑管理器使用AddHintPriovider方法将CPU管理器加入管理

五、回到命令行启动部分,调用NewMainKubelet(),构建Kubelet结构体

六、构建Kubelet结构体时,将CPU管理器跟拓扑管理器封装为InternalContainerLifecycle接口,其实现Pod相关的生命周期资源管理操做,涉及CPU相关的是PreStart方法

七、构建Kubelet结构体时,调用AddPodmitHandler将GetAllocateResourcesPodAdmitHandler方法加入到Pod准入插件中,在Pod建立时,资源预分配检查

八、构建Kubelet结构体后,调用ContainerManager的Start方法,ContainerManager在Start方法中调用CPU管理器的Start方法,其作一些处理工做并孵化一个goroutine,执行reconcileState()

下面依次进行讲解。

STEP 1

Kubelet中调用NewContainerManager构建ContainerManager, 涉及代码为cmd/kubelet/app/server.go

在run函数中完成ContainerManager初始化工做

func run(ctx context.Context, 参数太长,不写全了){
	....
	if kubeDeps.ContainerManager == nil {
	...
	kubeDeps.ContainerManager, err = cm.NewContainerManager(
	...
	)
	...
	}
}

STEP 2-4	

NewContainerManager函数调用topologymanager.NewManager构建拓扑管理器,涉及代码
复制代码

pkg/kubelet/cm/container_manager_linux.gomarkdown

func NewContainerManager(参数太长,不写全了) {
	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager){
		// 判断特性是否开启,构建拓扑管理
		cm.topologyManager, err = topologymanager.NewManager(
			machineInfo.Topology,
			nodeConfig.ExperimentalTopologyManagerPolicy,
			nodeConfig.ExperimentalTopologyManagerScope,
		)
	}
	// 判断特性是否开启,构建CPU管理
	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
		cm.cpuManager, err = cpumanager.NewManager(
			nodeConfig.ExperimentalCPUManagerPolicy,
			nodeConfig.ExperimentalCPUManagerReconcilePeriod,
			machineInfo,
			nodeConfig.NodeAllocatableConfig.ReservedSystemCPUs,
			cm.GetNodeAllocatableReservation(),
			nodeConfig.KubeletRootDir,
			cm.topologyManager,
		)
		if err != nil {
			klog.Errorf("failed to initialize cpu manager: %v", err)
			return nil, err
		}
		// 拓扑管理器使用AddHintPriovider方法将CPU管理器加入管理 
		cm.topologyManager.AddHintProvider(cm.cpuManager)
	}
}

其中关于CPU管理器的初始化:

type CPUTopology struct {
    NumCPUs    int
    NumCores   int
    NumSockets int
    CPUDetails CPUDetails
}

type CPUDetails map[int]CPUInfo

type CPUInfo struct {
    NUMANodeID int
    SocketID   int
    CoreID     int
}

func NewManager(参数太多,省略了) (Manager, error) {
	// 根据cpuPolicyName,决定初始化policy,当前支持none和static
	switch policyName(cpuPolicyName) {
		...
		case PolicyStatic:
			// 1. 根据cadvisor的数据,生产topology结构体
			topo, err = topology.Discover(machineInfo)
			// 2. 检查reserved的CPU是否为0,须要kube+system reserved的CPU > 0
			// 3. 初始化policy
			policy, err = NewStaticPolicy(topo, numReservedCPUs, specificCPUs, affinity)
		...
	}
}
   

STEP 5-7

在run函数中完成ContainerManager初始化工做后,调用RunKubelet函数构建Kubelet结构体,其最终调用NewMainKubelet(),完成Kubelet结构体构建。涉及代码pkg/kubelet/kubelet.go

func NewMainKubelet(参数太长,不写全了)(*Kubelet, error) {
	...
	klet := &Kubelet{
	...
	containerManager: kubeDeps.ContainerManager,
	...
	}
	...
	runtime, err := kuberuntime.NewKubeGenericRuntimeManager(
	...
	// 构建Kubelet结构体时,将CPU管理器跟拓扑管理器封装为InternalContainerLifecycle接口
	kubeDeps.ContainerManager.InternalContainerLifecycle(),
	...
	)
	...
	// 调用AddPodmitHandler将GetAllocateResourcesPodAdmitHandler方法加入到Pod准入插件中,在Pod建立时,资源预分配检查
klet.admitHandlers.AddPodAdmitHandler(klet.containerManager.GetAllocateResourcesPodAdmitHandler())
	...
}

   其中在InternalContainerLifecycle接口,涉及CPU部分在PreStartContainer方法,涉及代码pkg/kubelet/cm/internal_container_lifecycle.go

func (i *internalContainerLifecycleImpl) PreStartContainer(参数太长,不写全了) error {
   if i.cpuManager != nil {
      i.cpuManager.AddContainer(pod, container, containerID)
   }

   if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.TopologyManager) {
      err := i.topologyManager.AddContainer(pod, containerID)
      if err != nil {
         return err
      }
   }
   return nil
}

那么什么时候调用呢?

上面咱们提到了kuberuntime.NewKubeGenericRuntimeManager,该函数实例化KubeGenericRuntimeManager结构体(后续详细介绍),而该结构体在startContainer方法中,进行调用,涉及代码pkg/kubelet/kuberuntime/kuberuntime_container.go

// 用于启动容器,该结构体实现了Runtime接口
func (m *kubeGenericRuntimeManager) startContainer(参数太多,不写了) (string, error) {
	...
	err = m.internalLifecycle.PreStartContainer(pod, container, containerID)
	...
}

另外GetAllocateResourcesPodAdmitHandler 须要实现返回的结构体须要实现Admit接口
复制代码

涉及代码pkg/kubelet/cm/container_manager_linux.goapp

func (m *resourceAllocator) Admit(attrs *lifecycle.PodAdmitAttributes) lifecycle.PodAdmitResult {
   pod := attrs.Pod

   for _, container := range append(pod.Spec.InitContainers, pod.Spec.Containers...) {
      ...

      if m.cpuManager != nil {
         err = m.cpuManager.Allocate(pod, &container)
         ...
      }
   }

   return lifecycle.PodAdmitResult{Admit: true}
}

 实际调用逻辑为m.cpuManager.Allocate->m.policy.Allocate->func (p *staticPolicy) Allocate (none策略无需操做),涉及代码pkg/kubelet/cm/cpumanager/policy_static.go

func (p *staticPolicy) Allocate(s state.State, pod *v1.Pod, container *v1.Container) error {
	 // 1. 如介绍所说,检查是否知足分配,即QOS为Guaranteed,且分配CPU为整型
   if numCPUs := p.guaranteedCPUs(pod, container); numCPUs != 0 {
   		// 2. 获取是否分配过,分配过则更新便可
      if cpuset, ok := s.GetCPUSet(string(pod.UID), container.Name); ok {
       ...
      }
      // 3. 获取亲和性拓扑
      hint := p.affinity.GetAffinity(string(pod.UID), container.Name)
      // 4. 根据numa亲和性进行分配
			cpuset, err := p.allocateCPUs(.. )
			// 5. 设置分配结果
			s.SetCPUSet(string(pod.UID), container.Name, cpuset)
			// 6. 设置reuse字段
			p.updateCPUsToReuse(pod, container, cpuset)

	}
	// container belongs in the shared pool (nothing to do; use default cpuset)
	return nil
}

STEP 8

构建完成Kubelet结构体后,在Kubelet方法initializeRuntimeDependentModules中调用ContainerManager的Start方法,涉及代码pkg/kubelet/kubelet.go

func (kl *Kubelet) initializeRuntimeDependentModules() {
	...
	// 这里根据咱们前面说明的,须要cadvisor的数据,所以须要提早启动
	if err := kl.containerManager.Start(省略); err != nil {
		...
	}
	...
}
复制代码

ContainerManager在Start方法中调用CPU管理器的Start方法,具体步骤以下:ide

a. 构建Checkpoint,其中包含文件及内存的操做

b. 根据初始化的policy,运行Start, 实际只有static起到做用,主要是校验工做

c. 孵化一个goroutine,执行reconcileState()

func (cm *containerManagerImpl) Start(参数太多,省略) error {
	...
	// 初始化CPU管理器
	if utilfeature.DefaultFeatureGate.Enabled(kubefeatures.CPUManager) {
		...
		err = cm.cpuManager.Start(参数太多,省略)
		...
	}
	...
}
// 涉及代码pkg/kubelet/cm/cpumanager/cpu_manager.go
func (m *manager) Start(参数太多,省略) error {
	...
	// 该处为Checkpoint处理,实际为文件管理工做,即分配等状况的数据保存
	
	stateImpl, err := state.NewCheckpointState(m.stateFileDirectory, cpuManagerStateFileName, m.policy.Name(), m.containerMap)
	...
	// 孵化一个goroutine,执行reconcileState()
	// 处理当前实际CPU分配的工做,相似actual与desired
	go wait.Until(func() { m.reconcileState() }, m.reconcilePeriod, wait.NeverStop)
}

	其中reconcileState  主要完成如下工做

	a. 处理当前活跃Pod,更新containerMap结构体

	b. 经过CRI接口更新容器底层的CPU配置(即m.containerRuntime.UpdateContainerResources)

后续zouyee将带各位看看ContainerManager各大组件:拓扑管理、设备管理、容器管理等。
复制代码

点击查看全文函数

后续相关内容,请查看公众号:DCOSoop

4、参考资料源码分析

一、cpusetspa

二、cpu topology插件

三、cpu manager policy