ETCD探索-Watch

ETCD探索-Watch

梗概

watch是mvcc包中的一个功能,之因此拿出来讲,是由于它确实有很重的逻辑。watch是监听一个或一组key,key的任何变化都会发出消息。某种意义上讲,这就是发布订阅模式。golang

对比

既然Watch机制就是发布订阅模式,咱们经过对比Kafka,来更深刻了解Watch。
首先说明结论:缓存

ETCD没有消费者组的概念,因此不能代替Kafka

对比其余方面呢:mvc

ETCD Kafka
消费方式 监听一个Key 订阅一个Topic
生产方式 Put(Key, Value) Produce(Topic, Message)
历史消息是否保留 保留 保留
可否从指定位置消费 能够从指定Revision消费 能够从指定offset消费
可否保证消息不重放 不能 消费者会主动上报offset,kafka会保存每一个消费者的offset,消费者重启会从当前进度消费

对比Kafka不是试图用ETCD代替Kafka,是想经过对比了解Watch的特性和局限性oop

猜测

在讨论别人是怎么实现的时候,本身总要先猜测下。想的过程当中就会发现难点在哪。
个人想法:spa

type watcher struct {
    key string // 要监听的key
    
    ch  chan struct{} // 经过ch将消息发出来
}

func loop() {
    for _, w := range []watchers {
        ch <- message
    }
}

解释下,个人想法中,每个监听者都是一个watcher,监听者会本身消费本身的ch,实现消费功能。在服务端须要维护一个loop,将消息不断的发送到每个监听者的ch中。设计

我感受大多数人的最直观想法应该就是这样。code

这样作我实现了协程

  • 订阅发布功能

但我没有作到对象

  • 同时监听一个范围的key(好比:我能够监听key=foo,但不能监听key=foo~fox。这是ETCD一个重要的功能)
  • 消费者消费速率不一样(好比:按个人设想,有一个消费者出现阻塞,会致使loop阻塞)

有了这些想法以后,咱们来看看ETCD中Watch是怎么实现的。blog

实现

在MVCC文章中提到,KV接口的具体实现是store结构体。Watch的实现是在store上封装了一层,叫作:watchableStore,重写了store的Write方法。
经过MVCC中介绍,store的任何写操做,都须要Write方法返回的TxnWrite。因此这里重写Write方法意味这任何写操做都会通过watchableStore。

func (tw *watchableStoreTxnWrite) End() {  
   changes := tw.Changes()  
  
   evs := make([]mvccpb.Event, len(changes))  
   for i, change := range changes {  
      evs[i].Kv = &changes[i]
   }  

   tw.s.notify(rev, evs)  
   tw.TxnWrite.End()        
}

type watchableStoreTxnWrite struct {  
   TxnWrite  
   s *watchableStore  
}  
  
func (s *watchableStore) Write(trace *traceutil.Trace) TxnWrite {  
   return &watchableStoreTxnWrite{s.store.Write(trace), s}  
}

以上代码只列出了核心的逻辑,不难看出,watchableStoreTxnWrite在事务提交时,先将本次变动changes打包成Event,而后调用notify来将变动通知出去。最后真正提交事务TxnWrite.End()

如今待推送的消息(Event)已经经过notify方法进入到了Watch机制中,咱们看看这个消息是如何流转的。

首先须要介绍几个对象:

  • Event

事件。变动的消息是以Event的形式发送出去的,Event包括KeyValue,同时包括操做类型(Put、Delete等)

  • watcher

watcher监听一个或一组key,若是有变动,watcher将变动内容经过chan发送出去。

  • watcherGroup

顾名思义,一组watcher。watcherGroup管理多个watcher,可以根据key快速找到监听该key的一个或多个watcher。

  • watchableStore

继承自store,在store基础上实现了watch功能。watchableStore管理着两个watcherGroup:synced、unsynced,和一个用于缓存的victims。victims是缓存当前未发出去的Event。

  • watchStream

watchStream是对watchableStore的封装。由于watchableStore继承自store,因此他实现了不少方法,但这些方法并不都是用于Watch功能。因此watchStream对watchableStore再次封装,暴露出与Watch有关的方法。

在知道这5个对象以后,咱们是如何使用Watch呢?

func testWatch() {
    s := newWatchableStore()
    
    w := s.NewWatchStream()
    
    w.Watch(start_key: foo, end_key: nil)
    
    w.Watch(start_key: bar, end_key: nil)
    
    for {
        consume := <- w.Chan()
    }
}

解释下,咱们先建立了watchableStore,这是ETCD启动后就建立了的。当咱们要使用Watch功能时,咱们建立了一个watchStream(s.NewWatchStream)。建立出来的w能够监听多个key:foo、bar。以后咱们就能够消费w.Chan()返回的chan。foo或bar的任何变化,都会经过这个chan发送给消费端consume。

因而咱们便获得下面这幅图:
image.png

能够看到watchStream实现了在一大堆kv的变化中,过滤出监听的key,将key的变化输出。

紧接着,咱们将这幅图补充完整:
image.png

这幅图是什么意思呢?
watchableStore收到了全部key的变动后,将这些key交给synced(watchGroup),synced可以快速地从全部key中找到监听的key。将这些key发送给对应的watcher,这些watcher再经过chan将变动信息发送出去。

synced是怎么快速找到符合条件的key呢?
ETCD中使用了map和adt(红黑树)来实现。
不单独使用map是由于watch能够监听一个范围的key。若是只监听一个key

watch(start_key: foo, end_key: nil)

咱们能够这样存储

map[key]*watcher

这样能够根据key快速找到对应的watcher,ETCD也是这样作的。

但对于一组key呢?

watch(start_key: foo, end_key: fop)

这里我监听了从foo->fop之间的全部key,理论上这些key的数目是无限的,因此没法再使用map。
好比:key=fooac也属于监听范围。
ETCD用adt来存储这种key。
image.png

adt的实现这里不作介绍,只用知道adt可以根据key=fooac快速地找到所属范围foo->fop。

adt的原理推荐这篇文章: https://www.jianshu.com/p/e13...
adt的go实现:go.etcd.io/etcd/pkg/ad

在找到watcher后,调用watcher的send()方法,将变动的Event发送出去。

这就是上述图的意思,也就是正常的Watch流程。

各类场景

上图所述是正常流程,可是会有不少不正常的状况发生。
上图能够看到,消息都是经过一个Chan发送出去,但若是消费者消费速度慢,Chan就容易堆积。Chan的空间不可能无限大,那就必然会有满的时候,满了后该怎么办呢?

接下来就要讨论上图unsynced、victims的做用了。

Chan何时会满呢?
image.png

代码中Chan的长度是1024。不过这也是一个随机值,只是没有如今更好的选择。

一旦满了,会发生如下操做:

func (s *watchableStore) notify() {
    var victim watcherBatch
    ...
    w.minRev = rev + 1           // w是当前watcher
    if victim == nil {  
       victim = make(watcherBatch)  
    }  
    w.victim = true              // w被标记为受损的
    victim[w] = eb               // eb是当前的变动消息EventBatch
    s.synced.delete(w)
    ...
    s.addVictim(victim)          // 将victim添加到s的victims中
}

(victim:受害者、牺牲品、受损的)
watcher会记录当前的Revision,并将自身标记为受损的。这次的变动操做会被保存到watchableStore的victims中。同时该watcher会被从synced踢出。

假设此时有一个写操做:foo=f1。而正好Chan此时刚满,则监听foo的watcher将从synced中踢出,同时foo=f1被保存到victims中
image.png

接下来对foo的任何变动,该watcher都不会记录。那这些消息就都丢掉了吗?固然不是,watcher变成受损状态时记录下了当时的Revision,这个很重要。

这时要说到两个工做协程了:

// 咱们在建立watchableStore时,会同时启动两个工做协程
go s.syncWatchersLoop()  
go s.syncVictimsLoop()

顾名思义,第一个协程用于将unsynced的watcher同步为synced。
第二个协程用于循环清除watchableStore中的victims。

在上面的场景中,咱们知道,队列满时,当时变动的Event被放入了victims中。这个协程就会试图清除这个Event。怎么清除呢?协程会不断尝试让watcher发送这个Event,一旦队列不满,watcher将这个Event发出后。该watcher就被划入了unsycned中,同时再也不是受损状态。

image.png

此时syncWatchersLoop协程就开始起做用。因为在受损状态下,这个watcher已经错过了不少消息。为了追回进度,协程会根据watcher保存的Revision,找出受损以后全部的消息,将关于foo的消息所有给watcher,当watcher将这些消息都发送出去后。watcher就脱离了unsynced,成为了synced。

至此就解决了Chan满致使的问题。同时也阐明了Watch的设计实现。

相关文章
相关标签/搜索