专一于大数据及容器云核心技术解密,可提供全栈的大数据+云原平生台咨询方案,请持续关注本套博客。若有任何学术交流,可随时联系。更多内容请关注《数据云技术社区》公众号。 api
items解释:
- items map[string] Deltas
- type Deltas []Delta // Delta数组
- f.items表示Map集合
- f.items[id] 表示某一个key(资源对象)对应的数组集合,即:资源操做变化数组
// 代码源自client-go/tools/cache/delta_fifo.go
type DeltaFIFO struct {
lock sync.RWMutex // 读写锁,由于涉及到同时读写,读写锁性能要高
cond sync.Cond // 给Pop()接口使用,在没有对象的时候能够阻塞,内部锁复用读写锁
items map[string]Deltas // 这个应该是Store的本质了,按照kv的方式存储对象,可是存储的是对象的Deltas数组
queue []string // 这个是为先入先出实现的,存储的就是对象的键
populated bool // 经过Replace()接口将第一批对象放入队列,或者第一次调用增、删、改接口时标记为true
initialPopulationCount int // 经过Replace()接口将第一批对象放入队列的对象数量
keyFunc KeyFunc // 对象键计算函数,在Indexer那篇文章介绍过
knownObjects KeyListerGetter // 前面介绍就是为了这是用,该对象指向的就是Indexer,
closed bool // 是否已经关闭的标记
closedLock sync.Mutex // 专为关闭设计的所,为何不复用读写锁?
}
复制代码
id, err := f.KeyOf(obj) //获得obj对应的Map的key
newDeltas := append(f.items[id], Delta{actionType, obj}) //追加,生成新数组
f.items[id] = newDeltas //更新DeltaFIFO.items集合
复制代码
// 代码源自client-go/tools/cache/delta_fifo.go
// 从函数名称来看把“动做”放入队列中,这个动做就是DeltaType,并且已经加锁了
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {
// 前面提到的计算对象键的函数
id, err := f.KeyOf(obj)
if err != nil {
return KeyError{obj, err}
}
// 若是是同步,而且对象将来会被删除,那么就直接返回,不必记录这个动做了
// 确定有人会问为何Add/Delete/Update这些动做能够,由于同步对于已经删除的对象是没有意义的
// 已经删除的对象后续跟添加、更新有可能,由于同名的对象又被添加了,删除也是有可能
// 删除有些复杂,后面会有说明
if actionType == Sync && f.willObjectBeDeletedLocked(id) {
return nil
}
// 同一个对象的屡次操做,因此要追加到Deltas数组中
newDeltas := append(f.items[id], Delta{actionType, obj})
// 合并操做,去掉冗余的delta
newDeltas = dedupDeltas(newDeltas)
// 判断对象是否已经存在
_, exists := f.items[id]
// 合并后操做有可能变成没有Delta么?后面的代码分析来看应该不会,因此暂时不知道这个判断目的
if len(newDeltas) > 0 {
// 若是对象没有存在过,那就放入队列中,若是存在说明已经在queue中了,也就不必再添加了
if !exists {
f.queue = append(f.queue, id)
}
// 更新Deltas数组,通知全部调用Pop()的人
f.items[id] = newDeltas
f.cond.Broadcast()
} else if exists {
// 直接把对象删除,这段代码我不知道什么条件会进来,由于dedupDeltas()确定有返回结果的
// 后面会有dedupDeltas()详细说明
delete(f.items, id)
}
return nil
}
复制代码
// 代码源自client-go/tools/cache/delta_fifo.go
func dedupDeltas(deltas Deltas) Deltas {
n := len(deltas)
if n < 2 {
return deltas
}
// 取出最后两个
a := &deltas[n-1]
b := &deltas[n-2]
// 判断若是是重复的,那就删除这两个delta把合并后的追加到Deltas数组尾部
if out := isDup(a, b); out != nil {
d := append(Deltas{}, deltas[:n-2]...)
return append(d, *out)
}
return deltas
}
// 判断两个Delta是不是重复的
func isDup(a, b *Delta) *Delta {
// 只有一个判断,只能判断是否为删除类操做,和咱们上面的判断相同
// 这个函数的本意应该还能够判断多种类型的重复,当前来看只能有删除这一种可以合并
if out := isDeletionDup(a, b); out != nil {
return out
}
return nil
}
// 判断是否为删除类的重复
func isDeletionDup(a, b *Delta) *Delta {
// 两者都是删除那确定有一个是重复的
if b.Type != Deleted || a.Type != Deleted {
return nil
}
// 理论上返回最后一个比较好,可是对象已经再也不系统监控范围,前一个删除状态是好的
if _, ok := b.Object.(DeletedFinalStateUnknown); ok {
return a
}
return b
}
复制代码
// 代码源自client-go/tools/cache/delta_fifo.go
func (f *DeltaFIFO) Replace(list []interface{}, resourceVersion string) error {
f.lock.Lock()
defer f.lock.Unlock()
keys := make(sets.String, len(list))
// 遍历全部的输入目标
for _, item := range list {
// 计算目标键
key, err := f.KeyOf(item)
if err != nil {
return KeyError{item, err}
}
// 记录处理过的目标键,采用set存储,是为了后续快速查找
keys.Insert(key)
// 由于输入是目标全量,因此每一个目标至关于从新同步了一次
if err := f.queueActionLocked(Sync, item); err != nil {
return fmt.Errorf("couldn't enqueue object: %v", err)
}
}
// 若是没有存储的话,本身存储的就是全部的老对象,目的要看看那些老对象不在全量集合中,那么就是删除的对象了
if f.knownObjects == nil {
// 遍历全部的元素
for k, oldItem := range f.items {
// 这个目标在输入的对象中存在就能够忽略
if keys.Has(k) {
continue
}
// 输入对象中没有,说明对象已经被删除了。
var deletedObj interface{}
if n := oldItem.Newest(); n != nil {
deletedObj = n.Object
}
// 终于看到哪里用到DeletedFinalStateUnknown了,队列中存储对象的Deltas数组中
// 可能已经存在Delete了,避免重复,采用DeletedFinalStateUnknown这种类型
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
// 若是populated尚未设置,说明是第一次而且尚未任何修改操做执行过
if !f.populated {
f.populated = true
f.initialPopulationCount = len(list) // 记录第一次经过来的对象数量
}
return nil
}
// 下面处理的就是检测某些目标删除可是Delta没有在队列中
// 从存储中获取全部对象键
knownKeys := f.knownObjects.ListKeys()
queuedDeletions := 0
for _, k := range knownKeys {
// 对象还存在那就忽略
if keys.Has(k) {
continue
}
// 获取对象
deletedObj, exists, err := f.knownObjects.GetByKey(k)
if err != nil {
deletedObj = nil
glog.Errorf("Unexpected error %v during lookup of key %v, placing DeleteFinalStateUnknown marker without object", err, k)
} else if !exists {
deletedObj = nil
glog.Infof("Key %v does not exist in known objects store, placing DeleteFinalStateUnknown marker without object", k)
}
// 累积删除的对象数量
queuedDeletions++
// 把对象删除的Delta放入队列
if err := f.queueActionLocked(Deleted, DeletedFinalStateUnknown{k, deletedObj}); err != nil {
return err
}
}
// 和上面的代码差很少,只是计算initialPopulationCount值的时候增长了删除对象的数量
if !f.populated {
f.populated = true
f.initialPopulationCount = len(list) + queuedDeletions
}
return nil
}
复制代码
period time.Duration // 反射器在List和Watch的时候理论上是死循环,只有出现错误才会退出
这个变量用在出错后多长时间再执行List和Watch,默认值是1秒钟
resyncPeriod time.Duration // 从新同步的周期,不少人确定认为这个同步周期指的是从apiserver的同步周期
其实这里面同步指的是shared_informer使用者须要按期同步全量对象
复制代码
func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
defer utilruntime.HandleCrash()
fifo := NewDeltaFIFO(MetaNamespaceKeyFunc, s.indexer)
cfg := &Config{
Queue: fifo,
ListerWatcher: s.listerWatcher,
ObjectType: s.objectType,
FullResyncPeriod: s.resyncCheckPeriod,
RetryOnError: false,
ShouldResync: s.processor.shouldResync,
Process: s.HandleDeltas,
}
func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.controller = New(cfg)
s.controller.(*controller).clock = s.clock
s.started = true
}()
// Separate stop channel because Processor should be stopped strictly after controller
processorStopCh := make(chan struct{})
var wg wait.Group
defer wg.Wait() // Wait for Processor to stop
defer close(processorStopCh) // Tell Processor to stop
wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
wg.StartWithChannel(processorStopCh, s.processor.run)
defer func() {
s.startedLock.Lock()
defer s.startedLock.Unlock()
s.stopped = true // Don't want any new listeners }() //初始化Reflector,更新Index, 并发送通知给Controller来回调处理 s.controller.Run(stopCh) } 复制代码
本文综合分析了Kubernetes 大量源码,试图从较高的视野来看问题,猛看表,一天时间就过去了。辛苦成文,各自珍惜,谢谢!数组
专一于大数据及容器云核心技术解密,可提供全栈的大数据+云原平生台咨询方案,请持续关注本套博客。若有任何学术交流,可随时联系。更多内容请关注《数据云技术社区》公众号。 bash