MVCC模块ETCD的存储模块,是ETCD核心模块。git
做为一个开源项目,其代码的封装是值得咱们学习的。MVCC做为底层模块,对上层提供统一的方法,而这些方法都定义在kv.go这个文件中,很像一个头文件(.h)。咱们能够只看kv.go以及配合kv_test.go就能够知道mvcc包是怎么用的。github
type KV interface { ReadView WriteView // Read creates a read transaction. Read(trace *traceutil.Trace) TxnRead // Write creates a write transaction. Write(trace *traceutil.Trace) TxnWrite // Hash computes the hash of the KV's backend. Hash() (hash uint32, revision int64, err error) // HashByRev computes the hash of all MVCC revisions up to a given revision. HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error) // Compact frees all superseded keys with revisions less than rev. Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) // Commit commits outstanding txns into the underlying backend. Commit() // Restore restores the KV store from a backend. Restore(b backend.Backend) error Close() error }
(我只复制了最重要的Interface,请结合kv.go文件来看)
咱们是这样来使用KV的golang
func test() { kv := mvcc.New(...) kv.Put(key, value, ... kv.Range(key, end, ...) // Range就是Get方法 ... }
mvcc有一个New
方法,返回ConsistentWatchableKV
,它继承自KV,用来实现ETCD的Watch机制。咱们如今讨论KV
。json
做为一个Key-Value存储,存储模块至少要支持增删改查
。KV
首先定义了数组
Put(key, value []byte, lease lease.LeaseID) (rev int64) // “增”、“改”,WriteView中定义,KV继承ReadView Range(key, end []byte, ro RangeOptions) (r *RangeResult, err error) // “查”,ReadView中定义 DeleteRange(key, end []byte) (n, rev int64) // “删”,WriteView中定义,n是删除的个数
这三个方法实现了数据的基本操做。
参数很好理解缓存
同时KV
提供了事务操做网络
Read(trace *traceutil.Trace) TxnRead Write(trace *traceutil.Trace) TxnWrite Commit() // 提交未提交的事务
顾名思义,一个读事务、一个写事务
参数数据结构
除了基本数据操做,KV
提供了一些不容易理解的方法mvc
Hash() (hash uint32, revision int64, err error) HashByRev(rev int64) (hash uint32, revision int64, compactRev int64, err error) Compact(trace *traceutil.Trace, rev int64) (<-chan struct{}, error) Restore(b backend.Backend) error
这些方法从名字上咱们或许可以知道是在干什么,但殊不知道为何要这么干。app
(建议读者阅读kv_test.go中全部的测试用例,即可以更深入了解 KV
)
如此,咱们便有了这样一个轮廓:
MVCC为其余模块提供了这些功能。接下来,咱们就详细看下这些功能是怎么实现的。
在讨论实现过程的时候,我不会仔细介绍每一行代码,这样难以理解而且也没有必要。但我但愿读者可以本身阅读并跑一遍测试用例。
在看代码以前,请先试想,若是让你来作底层的KV实现,你会怎么作?
你可能会这么作:
既然是KV存储,用一个Map来存储。若是须要持久化存储,将Map中的数据拷贝一份到DB,Map中保留最新数据。
若是你是这么想的,那你已经实现了底层功能。你作到了:
或许经过封装很容易作到:
但你很难作到:
而且你这样作有些不足:
看完这些你有其余的想法实现上述难以作到的功能吗?
你可能会放弃使用Map作内存存储,而使用一种数据结构(B树或B+树)来代替。给每一kvpair附带一个版本号。
咱们试想了这么多,其实已经猜出了KV
底层实现的80%
具体看下
mvcc包中,store是KV
的具体实现,store支持Put、Range等操做。首先须要关注store中的一个变量:currentRev
。它是一个“严格”递增的版本号。“严格”是指currentRev的每次递增都会有锁。
store的每次写操做(Put、Delete等)都会使currentRev递增。currentRev就是Revision
理解这句话很重要。以下图
紧接着,咱们须要知道value存在哪?value是以键值对的形式保存,键值对是以KeyValue
结构体的方式存储在store中,咱们须要知道KeyValue
是怎么定义的。
// 直观的想,KeyValue多是这样 type KeyValue struct{ key string value string } // 但它其实包含更多的东西 type KeyValue struct { Key []byte `protobuf:"bytes,1,opt,name=key,proto3" json:"key,omitempty"` // 建立时的store版本号 CreateRevision int64 `protobuf:"varint,2,opt,name=create_revision,json=createRevision,proto3" json:"create_revision,omitempty"` // 最后一次修改的store版本号 ModRevision int64 `protobuf:"varint,3,opt,name=mod_revision,json=modRevision,proto3" json:"mod_revision,omitempty"` // KeyValue自身维护的版本号 Version int64 `protobuf:"varint,4,opt,name=version,proto3" json:"version,omitempty"` Value []byte `protobuf:"bytes,5,opt,name=value,proto3" json:"value,omitempty"` // 租约ID Lease int64 `protobuf:"varint,6,opt,name=lease,proto3" json:"lease,omitempty"` }
这个时候,咱们能够理解,每个KeyValue都对应一个Revision。
若是咱们想查询一个KeyValue,能够用Revision作索引
如上图:
foo=bar建立时Revision=1。那我就能够用1
来找到foo=bar
同理:
这么作的意义在哪?在于它保存了KeyValue的全部变化,即便最后foo=bar5。我依然能够找到foo以前的值。这就是mvcc多版本控制的根本所在。
这个时候咱们得出了用Revision找到KeyValue。但实际状况咱们是用key去找到KeyValue。
那么接下来的问题是怎么用key找到Revision。
store中使用BTree
实现key快速找到Revision。
(BTree具体实现能够阅读github.com/google/btree)
接下来咱们就能大概绘出这样一幅轮廓图:
其中Backend是KeyValue的存储,也是咱们以后要讨论的。
对应关系以下
当咱们想要查key为foo对应的值时,咱们能够指定Revision开始查询(若是不指定,默认从最新开始查询)
咱们找几处代码来论证以上所述。
首先找到store的Put具体实现。
// store -> storeTxnWrite -> Put -> put func put(...) { ... tw.tx.UnsafeSeqPut(keyBucketName, ibytes, d) // Backend 添加 Revision->KeyValue tw.s.kvindex.Put(key, idxRev) // BTree Index 添加key->Revision索引 ... }
再看store的Range的具体实现
// store -> storeTxnRead -> Range -> rangeKeys func rangeKeys(...) { ... revpairs := tr.s.kvindex.Revisions(key, end, rev) // 在Index中,用key查找Revision ... _, vs := tr.tx.UnsafeRange(keyBucketName, revBytes, nil, 0) // 用Revision在Backend中查找KeyValue ... }
从这两处就能简单证实上面所述。
接下来讨论Backend是怎么实现的。
咱们都了解,ETCD的持久化存储是基于BlotDB的,Backend就是对BlotDB封装了一层。Backend在BlotDB上作了一层缓存,缓存最近的数据。
backend结构体是Backend接口的具体实现,咱们首先关注Range、Put是怎么实现的
// backend -> ReadTx -> UnsafeRange func UnsafeRange(...){ ... keys, vals := rt.buf.Range(bucketName, key, endKey, limit) // 从缓存中查找 if int64(len(keys)) == limit { return keys, vals // 若是缓存中有所有数据,直接返回,不走DB } ... k2, v2 := unsafeRange(c, key, endKey, limit-int64(len(keys))) // 从DB中查找 } // backend -> batchTxBuffered -> UnsafePut func UnsafePut(...) { t.batchTx.UnsafePut(bucketName, key, value) // 数据保存到DB t.buf.put(bucketName, key, value) // 数据保存到缓存中 }
那么查询一个Key的流程就是这样的:
这里有一个细节,Backend的缓存中是怎么查找的呢?缓存的结构以下
type bucketBuffer struct { buf []kv used int }
数据是保存在一个数组中,每次查找都须要遍历数组吗?不是的
每次写事务提交后都会将本次写操做的缓存merge到读缓存上
func merge() { ... sort.Stable(bb) // remove duplicates, using only newest update widx := 0 for ridx := 1; ridx < bb.used; ridx++ { if !bytes.Equal(bb.buf[ridx].key, bb.buf[widx].key) { widx++ } bb.buf[widx] = bb.buf[ridx] } bb.used = widx + 1 }
merge会将全部key排序,而且去重。也就是说缓存中的key始终是有序的。
因此查找的时候就能够用二分法了。
func (bb *bucketBuffer) Range(key, endKey []byte, limit int64) (keys [][]byte, vals [][]byte) { f := func(i int) bool { return bytes.Compare(bb.buf[i].key, key) >= 0 } idx := sort.Search(bb.used, f) if idx < 0 { return nil, nil } if len(endKey) == 0 { if bytes.Equal(key, bb.buf[idx].key) { keys = append(keys, bb.buf[idx].key) vals = append(vals, bb.buf[idx].val) } return keys, vals } if bytes.Compare(endKey, bb.buf[idx].key) <= 0 { return nil, nil } for i := idx; i < bb.used && int64(len(keys)) < limit; i++ { if bytes.Compare(endKey, bb.buf[i].key) <= 0 { break } keys = append(keys, bb.buf[i].key) vals = append(vals, bb.buf[i].val) } return keys, vals }
查找先用二分法找到key所在的id,而后从id开始遍历,找到key~endKey之间的数据。
如今已经介绍了store中的存储用的数据结构。大体结构以下
Range时
Put时