watch是mvcc包中的一个功能,之因此拿出来讲,是由于它确实有很重的逻辑。watch是监听一个或一组key,key的任何变化都会发出消息。某种意义上讲,这就是发布订阅模式。golang
既然Watch机制就是发布订阅模式,咱们经过对比Kafka,来更深刻了解Watch。
首先说明结论:缓存
ETCD没有消费者组的概念,因此不能代替Kafka
对比其余方面呢:mvc
ETCD | Kafka | |
---|---|---|
消费方式 | 监听一个Key | 订阅一个Topic |
生产方式 | Put(Key, Value) | Produce(Topic, Message) |
历史消息是否保留 | 保留 | 保留 |
可否从指定位置消费 | 能够从指定Revision消费 | 能够从指定offset消费 |
可否保证消息不重放 | 不能 | 消费者会主动上报offset,kafka会保存每一个消费者的offset,消费者重启会从当前进度消费 |
对比Kafka不是试图用ETCD代替Kafka,是想经过对比了解Watch的特性和局限性oop
在讨论别人是怎么实现的时候,本身总要先猜测下。想的过程当中就会发现难点在哪。
个人想法:spa
type watcher struct { key string // 要监听的key ch chan struct{} // 经过ch将消息发出来 } func loop() { for _, w := range []watchers { ch <- message } }
解释下,个人想法中,每个监听者都是一个watcher,监听者会本身消费本身的ch,实现消费功能。在服务端须要维护一个loop,将消息不断的发送到每个监听者的ch中。设计
我感受大多数人的最直观想法应该就是这样。code
这样作我实现了协程
但我没有作到对象
有了这些想法以后,咱们来看看ETCD中Watch是怎么实现的。blog
在MVCC文章中提到,KV接口的具体实现是store结构体。Watch的实现是在store上封装了一层,叫作:watchableStore
,重写了store的Write方法。
经过MVCC中介绍,store的任何写操做,都须要Write方法返回的TxnWrite。因此这里重写Write方法意味这任何写操做都会通过watchableStore。
func (tw *watchableStoreTxnWrite) End() { changes := tw.Changes() evs := make([]mvccpb.Event, len(changes)) for i, change := range changes { evs[i].Kv = &changes[i] } tw.s.notify(rev, evs) tw.TxnWrite.End() } type watchableStoreTxnWrite struct { TxnWrite s *watchableStore } func (s *watchableStore) Write(trace *traceutil.Trace) TxnWrite { return &watchableStoreTxnWrite{s.store.Write(trace), s} }
以上代码只列出了核心的逻辑,不难看出,watchableStoreTxnWrite在事务提交时,先将本次变动changes
打包成Event,而后调用notify来将变动通知出去。最后真正提交事务TxnWrite.End()
如今待推送的消息(Event)已经经过notify方法进入到了Watch机制中,咱们看看这个消息是如何流转的。
首先须要介绍几个对象:
事件。变动的消息是以Event的形式发送出去的,Event包括KeyValue,同时包括操做类型(Put、Delete等)
watcher监听一个或一组key,若是有变动,watcher将变动内容经过chan发送出去。
顾名思义,一组watcher。watcherGroup管理多个watcher,可以根据key快速找到监听该key的一个或多个watcher。
继承自store,在store基础上实现了watch功能。watchableStore管理着两个watcherGroup:synced、unsynced,和一个用于缓存的victims。victims是缓存当前未发出去的Event。
watchStream是对watchableStore的封装。由于watchableStore继承自store,因此他实现了不少方法,但这些方法并不都是用于Watch功能。因此watchStream对watchableStore再次封装,暴露出与Watch有关的方法。
在知道这5个对象以后,咱们是如何使用Watch呢?
func testWatch() { s := newWatchableStore() w := s.NewWatchStream() w.Watch(start_key: foo, end_key: nil) w.Watch(start_key: bar, end_key: nil) for { consume := <- w.Chan() } }
解释下,咱们先建立了watchableStore,这是ETCD启动后就建立了的。当咱们要使用Watch功能时,咱们建立了一个watchStream
(s.NewWatchStream)。建立出来的w能够监听多个key:foo、bar。以后咱们就能够消费w.Chan()返回的chan。foo或bar的任何变化,都会经过这个chan发送给消费端consume。
因而咱们便获得下面这幅图:
能够看到watchStream实现了在一大堆kv的变化中,过滤出监听的key,将key的变化输出。
紧接着,咱们将这幅图补充完整:
这幅图是什么意思呢?
watchableStore收到了全部key的变动后,将这些key交给synced(watchGroup),synced可以快速
地从全部key中找到监听的key。将这些key发送给对应的watcher,这些watcher再经过chan将变动信息发送出去。
synced是怎么快速找到符合条件的key呢?
ETCD中使用了map和adt(红黑树)来实现。
不单独使用map是由于watch能够监听一个范围的key。若是只监听一个key
watch(start_key: foo, end_key: nil)
咱们能够这样存储
map[key]*watcher
这样能够根据key快速找到对应的watcher,ETCD也是这样作的。
但对于一组key呢?
watch(start_key: foo, end_key: fop)
这里我监听了从foo->fop之间的全部key,理论上这些key的数目是无限的,因此没法再使用map。
好比:key=fooac也属于监听范围。
ETCD用adt来存储这种key。
adt的实现这里不作介绍,只用知道adt可以根据key=fooac快速地找到所属范围foo->fop。
adt的原理推荐这篇文章: https://www.jianshu.com/p/e13...
adt的go实现:go.etcd.io/etcd/pkg/ad
在找到watcher后,调用watcher的send()方法,将变动的Event发送出去。
这就是上述图的意思,也就是正常的Watch流程。
上图所述是正常流程,可是会有不少不正常的状况发生。
上图能够看到,消息都是经过一个Chan发送出去,但若是消费者消费速度慢,Chan就容易堆积。Chan的空间不可能无限大,那就必然会有满的时候,满了后该怎么办呢?
接下来就要讨论上图unsynced、victims的做用了。
Chan何时会满呢?
代码中Chan的长度是1024。不过这也是一个随机值,只是没有如今更好的选择。
一旦满了,会发生如下操做:
func (s *watchableStore) notify() { var victim watcherBatch ... w.minRev = rev + 1 // w是当前watcher if victim == nil { victim = make(watcherBatch) } w.victim = true // w被标记为受损的 victim[w] = eb // eb是当前的变动消息EventBatch s.synced.delete(w) ... s.addVictim(victim) // 将victim添加到s的victims中 }
(victim:受害者、牺牲品、受损的)
watcher会记录当前的Revision,并将自身标记为受损的
。这次的变动操做会被保存到watchableStore的victims中。同时该watcher会被从synced踢出。
假设此时有一个写操做:foo=f1。而正好Chan此时刚满,则监听foo的watcher将从synced中踢出,同时foo=f1被保存到victims中
接下来对foo的任何变动,该watcher都不会记录。那这些消息就都丢掉了吗?固然不是,watcher变成受损状态时记录下了当时的Revision,这个很重要。
这时要说到两个工做协程了:
// 咱们在建立watchableStore时,会同时启动两个工做协程 go s.syncWatchersLoop() go s.syncVictimsLoop()
顾名思义,第一个协程用于将unsynced的watcher同步为synced。
第二个协程用于循环清除watchableStore中的victims。
在上面的场景中,咱们知道,队列满时,当时变动的Event被放入了victims中。这个协程就会试图清除这个Event。怎么清除呢?协程会不断尝试让watcher发送这个Event,一旦队列不满,watcher将这个Event发出后。该watcher就被划入了unsycned中,同时再也不是受损状态。
此时syncWatchersLoop协程就开始起做用。因为在受损状态下,这个watcher已经错过了不少消息。为了追回进度,协程会根据watcher保存的Revision,找出受损以后全部的消息,将关于foo的消息所有给watcher,当watcher将这些消息都发送出去后。watcher就脱离了unsynced,成为了synced。
至此就解决了Chan满致使的问题。同时也阐明了Watch的设计实现。