PersistentVolumeClaim(PVC)是用户存储的请求。 它相似于pod。Pod消耗节点资源,PVC消耗存储资源html
StorageClass 提供了一种描述他们提供的存储的“类”的方法。 不一样的类可能映射到服务质量级别,或备份策略,或者由群集管理员肯定的任意策略。 node
pv.kubernetes.io/bind-completed::yes 已经完成了 pvc 绑定mysql
pv.kubernetes.io/bound-by-controller:sql
PV是源,PVC是对这些资源的请求,生命周期:api
Provisioning ——-> Binding ——–>Using——>Releasing——>Recyclingapp
Provisioning:静态或者动态异步
Static: 建立多个PVasync
Dynamic:当建立的静态PV都不匹配用户的PersistentVolumeClaim时,集群可能会尝试为PVC动态配置卷。 StorageClasses:PVC必须请求一个类ide
Binding 须要两个步骤wordpress
pvc controller watch pvc资源,进行更新操做,本文章将分析这块内容
定义 nfs-pv.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
name: nfs-pv
namespace: default
spec:
capacity:
storage: 500Mi
accessModes:
- ReadWriteMany
nfs:
server: 192.168.73.184
path: /nfs/data
定义pvc.yaml
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: mysql-pv-claim
labels:
app: wordpress
spec:
storageClassName: rook-ceph-block
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 20Gi
注册persistentvolume-binder,controllers["persistentvolume-binder"] = startPersistentVolumeBinderController
// NewControllerInitializers is a public map of named controller groups (you can start more than one in an init func) // paired to their InitFunc. This allows for structured downstream composition and subdivision. func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc { controllers["persistentvolume-binder"] = startPersistentVolumeBinderController controllers["attachdetach"] = startAttachDetachController controllers["persistentvolume-expander"] = startVolumeExpandController controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController controllers["pvc-protection"] = startPVCProtectionController controllers["pv-protection"] = startPVProtectionController controllers["ttl-after-finished"] = startTTLAfterFinishedController controllers["root-ca-cert-publisher"] = startRootCACertPublisher return controllers }
建立 pv controller,关注的资源包括 volume pvc pod node storageclass等
包括cache,pvc volume队列等。其余忽略
controller := &PersistentVolumeController{ volumes: newPersistentVolumeOrderedIndex(), claims: cache.NewStore(cache.DeletionHandlingMetaNamespaceKeyFunc), kubeClient: p.KubeClient, eventRecorder: eventRecorder, runningOperations: goroutinemap.NewGoRoutineMap(true /* exponentialBackOffOnError */), cloud: p.Cloud, enableDynamicProvisioning: p.EnableDynamicProvisioning, clusterName: p.ClusterName, createProvisionedPVRetryCount: createProvisionedPVRetryCount, createProvisionedPVInterval: createProvisionedPVInterval, claimQueue: workqueue.NewNamed("claims"), volumeQueue: workqueue.NewNamed("volumes"), resyncPeriod: p.SyncPeriod, }
// Prober is nil because PV is not aware of Flexvolume. if err := controller.volumePluginMgr.InitPlugins(p.VolumePlugins, nil /* prober */, controller); err != nil { return nil, fmt.Errorf("Could not initialize volume plugins for PersistentVolume Controller: %v", err) }
p.VolumeInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) }, UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.volumeQueue, newObj) }, DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.volumeQueue, obj) }, }, ) controller.volumeLister = p.VolumeInformer.Lister() controller.volumeListerSynced = p.VolumeInformer.Informer().HasSynced
p.ClaimInformer.Informer().AddEventHandler( cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) }, UpdateFunc: func(oldObj, newObj interface{}) { controller.enqueueWork(controller.claimQueue, newObj) }, DeleteFunc: func(obj interface{}) { controller.enqueueWork(controller.claimQueue, obj) }, }, ) controller.claimLister = p.ClaimInformer.Lister() controller.claimListerSynced = p.ClaimInformer.Informer().HasSynced
controller.classLister = p.ClassInformer.Lister() controller.classListerSynced = p.ClassInformer.Informer().HasSynced controller.podLister = p.PodInformer.Lister() controller.podListerSynced = p.PodInformer.Informer().HasSynced controller.NodeLister = p.NodeInformer.Lister() controller.NodeListerSynced = p.NodeInformer.Informer().HasSynced
Run
--> ctrl.resync
--> ctrl.volumeWorker
--> updateVolume
--> ctrl.syncVolume
--> ctrl.claimWorker
--> ctrl.updateClaim
--> ctrl.storeClaimUpdate
--> ctrl.syncClaim
--> ctrl.syncUnboundClaim
--> syncBoundClaim
按期执行三个函数 resysc,这个按期list pv pvc并加入到队列
controller都是一个套路,分别分析volumeManager claimWorker
// Run starts all of this controller's control loops func (ctrl *PersistentVolumeController) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer ctrl.claimQueue.ShutDown() defer ctrl.volumeQueue.ShutDown() klog.Infof("Starting persistent volume controller") defer klog.Infof("Shutting down persistent volume controller") if !controller.WaitForCacheSync("persistent volume", stopCh, ctrl.volumeListerSynced, ctrl.claimListerSynced, ctrl.classListerSynced, ctrl.podListerSynced, ctrl.NodeListerSynced) { return } ctrl.initializeCaches(ctrl.volumeLister, ctrl.claimLister) go wait.Until(ctrl.resync, ctrl.resyncPeriod, stopCh) go wait.Until(ctrl.volumeWorker, time.Second, stopCh) go wait.Until(ctrl.claimWorker, time.Second, stopCh) metrics.Register(ctrl.volumes.store, ctrl.claims) <-stopCh }
从队列取进行处理,未有就退出等待下一个周期在处理
若是有则 updateVolume 更新操做,可能包括 add update sync 等操做,处理并删除队列
// volumeWorker processes items from volumeQueue. It must run only once, // syncVolume is not assured to be reentrant. func (ctrl *PersistentVolumeController) volumeWorker() { workFunc := func() bool { keyObj, quit := ctrl.volumeQueue.Get() if quit { return true } _, name, err := cache.SplitMetaNamespaceKey(key) volume, err := ctrl.volumeLister.Get(name) if err == nil { // The volume still exists in informer cache, the event must have // been add/update/sync ctrl.updateVolume(volume) return false } ctrl.deleteVolume(volume) return false } for { if quit := workFunc(); quit { klog.Infof("volume worker queue shutting down") return } } }
更新cache,若是cache有则不处理
调用syncVolume进行处理,接着分析
// updateVolume runs in worker thread and handles "volume added", // "volume updated" and "periodic sync" events. func (ctrl *PersistentVolumeController) updateVolume(volume *v1.PersistentVolume) { // Store the new volume version in the cache and do not process it if this // is an old version. new, err := ctrl.storeVolumeUpdate(volume) if !new { return } err = ctrl.syncVolume(volume) }
3.2.1 若是spec.claimRef未设置,则是未使用过的pv,则调用updateVolumePhase函数更新状态设置 phase 为 available,并更新cache
// [Unit test set 4] if volume.Spec.ClaimRef == nil { // Volume is unused klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is unused", volume.Name) if _, err := ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ""); err != nil { // Nothing was saved; we will fall back into the same // condition in the next call to this method return err } return nil }
剩下分析spce.claimRef已经被设置的状况
3.2.2 正在被bound中,更新状态available,更新cache
} else /* pv.Spec.ClaimRef != nil */ { // Volume is bound to a claim. if volume.Spec.ClaimRef.UID == "" { // The PV is reserved for a PVC; that PVC has not yet been // bound to this PV; the PVC sync will handle it. klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is pre-bound to claim %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef)) if _, err := ctrl.updateVolumePhase(volume, v1.VolumeAvailable, ""); err != nil { // Nothing was saved; we will fall back into the same // condition in the next call to this method return err } return nil }
3.2.3 这个比较多处理比较简单
根据 pv 的 claimRef 得到 pvc,若是在队列未发现,多是volume被删除了,或者失败了,从新同步pvc
// Get the PVC by _name_ var claim *v1.PersistentVolumeClaim claimName := claimrefToClaimKey(volume.Spec.ClaimRef) obj, found, err := ctrl.claims.GetByKey(claimName) if err != nil { return err } if !found && metav1.HasAnnotation(volume.ObjectMeta, annBoundByController) { // If PV is bound by external PV binder (e.g. kube-scheduler), it's // possible on heavy load that corresponding PVC is not synced to // controller local cache yet. So we need to double-check PVC in // 1) informer cache // 2) apiserver if not found in informer cache // to make sure we will not reclaim a PV wrongly. // Note that only non-released and non-failed volumes will be // updated to Released state when PVC does not exist. if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed { obj, err = ctrl.claimLister.PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(volume.Spec.ClaimRef.Name) if err != nil && !apierrs.IsNotFound(err) { return err } found = !apierrs.IsNotFound(err) if !found { obj, err = ctrl.kubeClient.CoreV1().PersistentVolumeClaims(volume.Spec.ClaimRef.Namespace).Get(volume.Spec.ClaimRef.Name, metav1.GetOptions{}) if err != nil && !apierrs.IsNotFound(err) { return err } found = !apierrs.IsNotFound(err) } } } if !found { klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s not found", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef)) // Fall through with claim = nil } else { var ok bool claim, ok = obj.(*v1.PersistentVolumeClaim) if !ok { return fmt.Errorf("Cannot convert object from volume cache to volume %q!?: %#v", claim.Spec.VolumeName, obj) } klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s found: %s", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef), getClaimStatusForLogging(claim)) } if claim != nil && claim.UID != volume.Spec.ClaimRef.UID { // The claim that the PV was pointing to was deleted, and another // with the same name created. klog.V(4).Infof("synchronizing PersistentVolume[%s]: claim %s has different UID, the old one must have been deleted", volume.Name, claimrefToClaimKey(volume.Spec.ClaimRef)) // Treat the volume as bound to a missing claim. claim = nil }
3.2.4 状况是claim可能被删除了,或者pv被删除了
这种状况须要调用reclaimVolume(第4章节讲解)将 pv回收根据策略(Retain / Delete / Recycle) 卷能够是保留,回收或删除
if claim == nil { // If we get into this block, the claim must have been deleted; // NOTE: reclaimVolume may either release the PV back into the pool or // recycle it or do nothing (retain) // Do not overwrite previous Failed state - let the user see that // something went wrong, while we still re-try to reclaim the // volume. if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed { // Also, log this only once: klog.V(2).Infof("volume %q is released and reclaim policy %q will be executed", volume.Name, volume.Spec.PersistentVolumeReclaimPolicy) if volume, err = ctrl.updateVolumePhase(volume, v1.VolumeReleased, ""); err != nil { // Nothing was saved; we will fall back into the same condition // in the next call to this method return err } } if err = ctrl.reclaimVolume(volume); err != nil { // Release failed, we will fall back into the same condition // in the next call to this method return err } return nil }
3.2.5 状况是正在被绑定中,加入到队列下次在校验一下
} else if claim.Spec.VolumeName == "" { if isMismatch, err := checkVolumeModeMismatches(&claim.Spec, &volume.Spec); err != nil || isMismatch { // Binding for the volume won't be called in syncUnboundClaim, // because findBestMatchForClaim won't return the volume due to volumeMode mismatch. volumeMsg := fmt.Sprintf("Cannot bind PersistentVolume to requested PersistentVolumeClaim %q due to incompatible volumeMode.", claim.Name) ctrl.eventRecorder.Event(volume, v1.EventTypeWarning, events.VolumeMismatch, volumeMsg) claimMsg := fmt.Sprintf("Cannot bind PersistentVolume %q to requested PersistentVolumeClaim due to incompatible volumeMode.", volume.Name) ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.VolumeMismatch, claimMsg) // Skipping syncClaim return nil } if metav1.HasAnnotation(volume.ObjectMeta, annBoundByController) { // The binding is not completed; let PVC sync handle it klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume not bound yet, waiting for syncClaim to fix it", volume.Name) } else { // Dangling PV; try to re-establish the link in the PVC sync klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume was bound and got unbound (by user?), waiting for syncClaim to fix it", volume.Name) } // In both cases, the volume is Bound and the claim is Pending. // Next syncClaim will fix it. To speed it up, we enqueue the claim // into the controller, which results in syncClaim to be called // shortly (and in the right worker goroutine). // This speeds up binding of provisioned volumes - provisioner saves // only the new PV and it expects that next syncClaim will bind the // claim to it. ctrl.claimQueue.Add(claimToClaimKey(claim)) return nil
3.2.6 已经绑定更新状态status phase为Bound
} else if claim.Spec.VolumeName == volume.Name { // Volume is bound to a claim properly, update status if necessary klog.V(4).Infof("synchronizing PersistentVolume[%s]: all is bound", volume.Name) if _, err = ctrl.updateVolumePhase(volume, v1.VolumeBound, ""); err != nil { // Nothing was saved; we will fall back into the same // condition in the next call to this method return err } return nil }
3.2.7 这volume绑到claim,而claim绑到其余pv,可气,所系直接重置volume
} else { // Volume is bound to a claim, but the claim is bound elsewhere if metav1.HasAnnotation(volume.ObjectMeta, annDynamicallyProvisioned) && volume.Spec.PersistentVolumeReclaimPolicy == v1.PersistentVolumeReclaimDelete { // This volume was dynamically provisioned for this claim. The // claim got bound elsewhere, and thus this volume is not // needed. Delete it. // Mark the volume as Released for external deleters and to let // the user know. Don't overwrite existing Failed status! if volume.Status.Phase != v1.VolumeReleased && volume.Status.Phase != v1.VolumeFailed { // Also, log this only once: klog.V(2).Infof("dynamically volume %q is released and it will be deleted", volume.Name) if volume, err = ctrl.updateVolumePhase(volume, v1.VolumeReleased, ""); err != nil { // Nothing was saved; we will fall back into the same condition // in the next call to this method return err } } if err = ctrl.reclaimVolume(volume); err != nil { // Deletion failed, we will fall back into the same condition // in the next call to this method return err } return nil }
3.2.8 volum绑定到claim,而claim绑定到其余volume,又不是动态的则更新状态unbind
else { // Volume is bound to a claim, but the claim is bound elsewhere // and it's not dynamically provisioned. if metav1.HasAnnotation(volume.ObjectMeta, annBoundByController) { // This is part of the normal operation of the controller; the // controller tried to use this volume for a claim but the claim // was fulfilled by another volume. We did this; fix it. klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound by controller to a claim that is bound to another volume, unbinding", volume.Name) if err = ctrl.unbindVolume(volume); err != nil { return err } return nil } else { // The PV must have been created with this ptr; leave it alone. klog.V(4).Infof("synchronizing PersistentVolume[%s]: volume is bound by user to a claim that is bound to another volume, waiting for the claim to get unbound", volume.Name) // This just updates the volume phase and clears // volume.Spec.ClaimRef.UID. It leaves the volume pre-bound // to the claim. if err = ctrl.unbindVolume(volume); err != nil { return err } return nil } }
路径 pkg/controller/volume/persistentvolume/pv_controller.go
若是是retain保留则无需处理
recycleVolumeOperation函数, deleteVolumeOperation函数查找插件进行删除工做,而后调用API删除 pv
deleteVolumeOperation函数查找插件进行删除工做,而后调用API删除 pv
从队列取要操做的对象,若是没有则退出,下一个周期进行操做,若是有则调用 updateClaim 更新操做
// claimWorker processes items from claimQueue. It must run only once, // syncClaim is not reentrant. func (ctrl *PersistentVolumeController) claimWorker() { workFunc := func() bool { keyObj, quit := ctrl.claimQueue.Get() namespace, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { klog.V(4).Infof("error getting namespace & name of claim %q to get claim from informer: %v", key, err) return false } claim, err := ctrl.claimLister.PersistentVolumeClaims(namespace).Get(name) if err == nil { // The claim still exists in informer cache, the event must have // been add/update/sync ctrl.updateClaim(claim) return false } ctrl.deleteClaim(claim) return false } for { if quit := workFunc(); quit { klog.Infof("claim worker queue shutting down") return } } }
根据注解中的 pv.kubernetes.io/bind-completed
// syncClaim is the main controller method to decide what to do with a claim. // It's invoked by appropriate cache.Controller callbacks when a claim is // created, updated or periodically synced. We do not differentiate between // these events. // For easier readability, it was split into syncUnboundClaim and syncBoundClaim // methods. func (ctrl *PersistentVolumeController) syncClaim(claim *v1.PersistentVolumeClaim) error { klog.V(4).Infof("synchronizing PersistentVolumeClaim[%s]: %s", claimToClaimKey(claim), getClaimStatusForLogging(claim)) if !metav1.HasAnnotation(claim.ObjectMeta, annBindCompleted) { return ctrl.syncUnboundClaim(claim) } else { return ctrl.syncBoundClaim(claim) } }
若是claim.Spec.VolumeName == "",说明pvc处于pending状态,第二种状况多是用户已经指定pv
选出全部最匹配的volume,是容量差额最小的,包括设置的权限等过滤
// [Unit test set 1] volume, err := ctrl.volumes.findBestMatchForClaim(claim, delayBinding) if err != nil { klog.V(2).Infof("synchronizing unbound PersistentVolumeClaim[%s]: Error finding PV for claim: %v", claimToClaimKey(claim), err) return fmt.Errorf("Error finding PV for claim %q: %v", claimToClaimKey(claim), err) }
看是否是storageclass模式,是则调用provisionClaim(第8章节处理)处理,仍是设置pending等待下一轮处理
if volume == nil { klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: no volume found", claimToClaimKey(claim)) // No PV could be found // OBSERVATION: pvc is "Pending", will retry switch { case delayBinding: ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.WaitForFirstConsumer, "waiting for first consumer to be created before binding") case v1helper.GetPersistentVolumeClaimClass(claim) != "": if err = ctrl.provisionClaim(claim); err != nil { return err } return nil default: ctrl.eventRecorder.Event(claim, v1.EventTypeNormal, events.FailedBinding, "no persistent volumes available for this claim and no storage class is set") } // Mark the claim as Pending and try to find a match in the next // periodic syncClaim if _, err = ctrl.updateClaimStatus(claim, v1.ClaimPending, nil); err != nil { return err } return nil }
else /* pv != nil */ { // Found a PV for this claim // OBSERVATION: pvc is "Pending", pv is "Available" klog.V(4).Infof("synchronizing unbound PersistentVolumeClaim[%s]: volume %q found: %s", claimToClaimKey(claim), volume.Name, getVolumeStatusForLogging(volume)) if err = ctrl.bind(volume, claim); err != nil { // On any error saving the volume or the claim, subsequent // syncClaim will finish the binding. return err } // OBSERVATION: claim is "Bound", pv is "Bound" return nil }
用户指定pv就不讲解了,差很少同样的原则,无非就是pv设置spec.ClaimRef,pvc设置spec.VlumeName,还有注解,设置status phase等
主要是处理不一样意的状况
obj, found, err := ctrl.volumes.store.GetByKey(claim.Spec.VolumeName) if err != nil { return err } if !found { // Claim is bound to a non-existing volume. if _, err = ctrl.updateClaimStatusWithEvent(claim, v1.ClaimLost, nil, v1.EventTypeWarning, "ClaimLost", "Bound claim has lost its PersistentVolume. Data on the volume is lost!"); err != nil { return err } return nil }
volume.Spec.ClaimRef == nil 与 volume.Spec.ClaimRef.UID == claim.UID 更新绑定关系。好理解不罗索了
动态提供pv的状况
provisionClaim
--> provisionClaimOperation
--> ctrl.findProvisionablePlugin
--> ctrl.setClaimProvisioner
--> plugin.NewProvisioner
--> provisioner.Provision
异步提供一个volume,具体函数请看provisionClaimOperation
// provisionClaim starts new asynchronous operation to provision a claim if // provisioning is enabled. func (ctrl *PersistentVolumeController) provisionClaim(claim *v1.PersistentVolumeClaim) error { if !ctrl.enableDynamicProvisioning { return nil } klog.V(4).Infof("provisionClaim[%s]: started", claimToClaimKey(claim)) opName := fmt.Sprintf("provision-%s[%s]", claimToClaimKey(claim), string(claim.UID)) startTime := time.Now() ctrl.scheduleOperation(opName, func() error { pluginName, err := ctrl.provisionClaimOperation(claim) timeTaken := time.Since(startTime).Seconds() metrics.RecordVolumeOperationMetric(pluginName, "provision", timeTaken, err) return err }) return nil }
根据storageclass得到plugin
claimClass := v1helper.GetPersistentVolumeClaimClass(claim) klog.V(4).Infof("provisionClaimOperation [%s] started, class: %q", claimToClaimKey(claim), claimClass) plugin, storageClass, err := ctrl.findProvisionablePlugin(claim) if err != nil { ctrl.eventRecorder.Event(claim, v1.EventTypeWarning, events.ProvisioningFailed, err.Error()) klog.V(2).Infof("error finding provisioning plugin for claim %s: %v", claimToClaimKey(claim), err) // The controller will retry provisioning the volume in every // syncVolume() call. return "", err }
8.1.1 更新pvc设置注解
volume.beta.kubernetes.io/storage-provisioner: ceph.rook.io/block
// Add provisioner annotation so external provisioners know when to start newClaim, err := ctrl.setClaimProvisioner(claim, provisionerName) if err != nil { // Save failed, the controller will retry in the next sync klog.V(2).Infof("error saving claim %s: %v", claimToClaimKey(claim), err) return pluginName, err } claim = newClaim
对于 PV controller 工做完成了,主要是对 external 设置注解 volume.beta.kubernetes.io/storage-provisioner:
对于使用 CSI 外部插件则工做完成了,PV controller 主要处理 In-tree 的插件。
External-Provision 则watch PVC,处理注解 volume.beta.kubernetes.io/storage-provisioner: 为本身插件的 PVC,而后调用插件的 Provison 方法,向插件 ceph rbd 的 GRPC CreateVolumeRequest 请求
pv.spec.claimRef == nil,未使用过的 pv,则设置 phase为 Available
pv.spec.claimRef .uid == "",正在被bound中,更新状态Available
若是没有找到合适的pv,看看是不是storageclass,根据插件 provision 建立 volume
pvc.spec.volumeName==“”,说明pvc处于pending状态
pv设置spec.ClaimRef,pvc设置spec.VlumeName,还有注解,设置status phase等
参考:
https://kubernetes.io/docs/concepts/storage/persistent-volumes/#bindin