MPT(Merkle Patricia Tries)是以太坊中存储区块数据的核心数据结构,它Merkle Tree和Patricia Tree融合一个树形结构,理解MPT结构对以后学习以太坊区块header以及智能合约状态存储结构的模块源码颇有帮助。node
它的叶子是数据块的hash,从图中能够看出非叶子节点是其子节点串联字符串的hash,底层数据的任何变更都会影响父节点,这棵树的Merkle Root表明对底层全部数据的“摘要”。
这样的树有一个很大的好处,好比咱们把交易信息写入这样的树形结构,当须要证实一个交易是否存在这颗树中的时候,就不须要从新计算全部交易的hash值。好比证实图中Hash 1-1,咱们能够借助Hash 1-0从新计算出Hash 1,而后再借助Hash 0从新计算出Top Hash,这样就能够根据算出来的Top Hash和原来的Top Hash是否同样,若是同样的话那么Hash 1-1就属于这棵树。
因此想象一下,咱们将这个Top Hash储存在区块头中,那么有了区块头就能够对区块信息进行验证了。同时 Hash 计算的过程能够十分快速,预处理能够在短期内完成。利用Merkle树结构能带来巨大的比较性能提高。redis
从它的名字压缩前缀树再结合上图就能够猜出来Patricia树的特色了,这种树形结构比将每个字符做为一个节点的普通trie树形结构,它的键值可使用多个字符,下降了树的高度,也节省了空间,再看个例子:
图中能够很容易看出数中所存储的键值对:算法
在以太坊中MPT的节点的规格主要有一下几个:数据库
这里还有一些知识点须要了解的,为了将MPT树存储到数据库中,同时还能够把MPT树从数据库中恢复出来,对于Extension和Leaf的节点类型作了特殊的定义:若是是一个扩展节点,那么前缀为0,这个0加在key前面。若是是一个叶子节点,那么前缀就是1。同时对key的长度就奇偶类型也作了设定,若是是奇数长度则标示1,若是是偶数长度则标示0。数组
以太坊中主要有一下几个地方用了MPT树形结构:缓存
State Trie 区块头中的状态树数据结构
Transactions Trie 区块头中的交易树app
Receipts Trie 区块头中的收据树函数
Storage Trie 存储树源码分析
这两个区块头中,state root,tx root receipt root分别存储了这三棵树的树根,第二个区块显示了当帐号175的数据变动(27 -> 45)的时候,只须要存储跟这个帐号相关的部分数据,并且老的区块中的数据仍是能够正常访问。
MPT树种还有一个重要的概念一个特殊的十六进制前缀(hex-prefix, HP)编码来对key编码,咱们先来了解一下编码定义规则,源码实现后面再分析:
HEX 十六进制编码
好比key=>"bob",b的ASCII十六进制编码为0x62,o的ASCII十六进制编码为0x6f,分解成高四位和第四位,16表示终结 0x10,最终编码结果为[6 2 6 15 6 2 16],
HEX-Prefix 十六进制前缀编码
十六进制前缀编码至关于一个逆向的过程,好比输入的是[6 2 6 15 6 2 16],根据第一个规则去掉终止符16。根据第二个规则key前补一个四元组,从右往左第一位为1表示叶子节点,从右往左第0位若是后面key的长度为偶数设置为0,奇数长度设置为1,那么四元组0010就是2。根据第三个规则,添加一个全0的补在后面,那么就是20.根据第三个规则内容压缩合并,那么结果就是[0x20 0x62 0x6f 0x62]
官方有一个详细的结构的示例:
下面再用一个图像化的示例来加深一下对上面的MPT规则的理解
key的16进制 | key | value |
---|---|---|
<64 6f> | do | verb |
<64 6f 67> | dog | puppy |
<64 6f 67 65> | doge | coin |
<68 6f 72 73 65> | horse | stallion |
Compact
就是上面说的HEX-Prefix
,keybytes为按完整字节(8bit)存储的正常信息,hex为按照半字节nibble(4bit)储存信息的格式。
go-ethereum/trie/encoding:
package trie func hexToCompact(hex []byte) []byte { terminator := byte(0) if hasTerm(hex) { //检查是否有结尾为0x10 => 16 terminator = 1 //有结束标记16说明是叶子节点 hex = hex[:len(hex)-1] //去除尾部标记 } buf := make([]byte, len(hex)/2+1) // 字节数组 buf[0] = terminator << 5 // 标志byte为00000000或者00100000 //若是长度为奇数,添加奇数位标志1,并把第一个nibble字节放入buf[0]的低四位 if len(hex)&1 == 1 { buf[0] |= 1 << 4 // 奇数标志 00110000 buf[0] |= hex[0] // 第一个nibble包含在第一个字节中 0011xxxx hex = hex[1:] } //将两个nibble字节合并成一个字节 decodeNibbles(hex, buf[1:]) return buf } //compact编码转化为Hex编码 func compactToHex(compact []byte) []byte { base := keybytesToHex(compact) base = base[:len(base)-1] // apply terminator flag // base[0]包括四种状况 // 00000000 扩展节点偶数位 // 00000001 扩展节点奇数位 // 00000010 叶子节点偶数位 // 00000011 叶子节点奇数位 // apply terminator flag if base[0] >= 2 { //若是是叶子节点,末尾添加Hex标志位16 base = append(base, 16) } // apply odd flag //若是是偶数位,chop等于2,不然等于1 chop := 2 - base[0]&1 return base[chop:] } // 将keybytes 转成十六进制 func keybytesToHex(str []byte) []byte { l := len(str)*2 + 1 //将一个keybyte转化成两个字节 var nibbles = make([]byte, l) for i, b := range str { nibbles[i*2] = b / 16 nibbles[i*2+1] = b % 16 } //末尾加入Hex标志位16 nibbles[l-1] = 16 return nibbles } // 将十六进制的bibbles转成key bytes,这只能用于偶数长度的key func hexToKeybytes(hex []byte) []byte { if hasTerm(hex) { hex = hex[:len(hex)-1] } if len(hex)&1 != 0 { panic("can't convert hex key of odd length") } key := make([]byte, (len(hex)+1)/2) decodeNibbles(hex, key) return key } func decodeNibbles(nibbles []byte, bytes []byte) { for bi, ni := 0, 0; ni < len(nibbles); bi, ni = bi+1, ni+2 { bytes[bi] = nibbles[ni]<<4 | nibbles[ni+1] } } // 返回a和b的公共前缀的长度 func prefixLen(a, b []byte) int { var i, length = 0, len(a) if len(b) < length { length = len(b) } for ; i < length; i++ { if a[i] != b[i] { break } } return i } // 十六进制key是否有结束标志符 func hasTerm(s []byte) bool { return len(s) > 0 && s[len(s)-1] == 16 }
上面已经分析了以太坊的key的编码方式,接下来咱们来看以太坊中MPT树的数据结构,在分析trie的数据结构前,咱们先来了解一下node的定义:
trie/node.go
type node interface { fstring(string) string cache() (hashNode, bool) canUnload(cachegen, cachelimit uint16) bool } type ( fullNode struct { //分支节点 Children [17]node // Actual trie node data to encode/decode (needs custom encoder) flags nodeFlag } shortNode struct { Key []byte Val node flags nodeFlag } hashNode []byte valueNode []byte )
上面代码中定义了四个struct,就是node的四种类型:
shortNode,key是一个任意长度的字符串(字节数组[]byte),体现了PatriciaTrie的特色,经过合并只有一个子节点的父节点和其子节点来缩短trie的深度,结果就是有些节点会有长度更长的key。
Val
指向分支节点或者叶子节点Val
为rlp编码数据,key为该数据的hash来看下trie的结构以及对trie的操做可能对上面各个node的类型使用可能会更清晰一点,咱们来接着看下trie的结构定义trie/trie.go:
type Trie struct { db *Database // 用levelDB作KV存储 root node //当前根节点 originalRoot common.Hash //启动加载时候的hash,能够从db中恢复出整个trie cachegen, cachelimit uint16 // cachegen 缓存生成值,每次Commit会+1 }
这里的cachegen缓存生成值会被附加在node节点上面,若是当前的cachegen-cachelimit参数大于node的缓存生成,那么node会从cache里面卸载,以便节约内存。一个缓存多久没被时候用就会被从缓存中移除,看起来和redis等一些LRU算法的cache db很像。
Trie的初始化:
func New(root common.Hash, db *Database) (*Trie, error) { if db == nil { panic("trie.New called without a database") } trie := &Trie{ db: db, originalRoot: root, } if (root != common.Hash{}) && root != emptyRoot { // 若是hash不是空值,从数据库中加载一个已经存在的树 rootnode, err := trie.resolveHash(root[:], nil) if err != nil { return nil, err } trie.root = rootnode //根节点为找到的trie } //不然返回新建一个树 return trie, nil }
这里的trie.resolveHash
就是加载整课树的方法,还有传入的root common.Hash
hash是一个将hex编码转为原始hash的32位byte[] (common.HexToHash()),来看下如何经过这个hash来找到整个trie的:
func (t *Trie) resolveHash(n hashNode, prefix []byte) (node, error) { cacheMissCounter.Inc(1) //没执行一次计数器+1 //上面说过了,n是一个32位byte[] hash := common.BytesToHash(n) //经过hash从db中取出node的RLP编码内容 enc, err := t.db.Node(hash) if err != nil || enc == nil { return nil, &MissingNodeError{NodeHash: hash, Path: prefix} } return mustDecodeNode(n, enc, t.cachegen), nil }
mustDecodeNode
中调用了decodeNode
,这个方法经过RLP的list长度来判断该编码内容属于上面节点,若是是两个字段则为shortNode,若是是17个字段则为fullNode,而后再调用各自的decode解析函数
func decodeNode(hash, buf []byte, cachegen uint16) (node, error) { if len(buf) == 0 { return nil, io.ErrUnexpectedEOF } elems, _, err := rlp.SplitList(buf) //将buf拆分为列表的内容以及列表后的任何剩余字节。 if err != nil { return nil, fmt.Errorf("decode error: %v", err) } switch c, _ := rlp.CountValues(elems); c { case 2: n, err := decodeShort(hash, elems, cachegen) //decode shortNode return n, wrapError(err, "short") case 17: n, err := decodeFull(hash, elems, cachegen) //decode fullNode return n, wrapError(err, "full") default: return nil, fmt.Errorf("invalid number of list elements: %v", c) } }
decodeShort
函数中经过key是否含有结束标识符来判断是叶子节点仍是扩展节点,这个咱们在上面的编码部分已经讲过,有结束标示符则是叶子节点,再经过rlp.SplitString
解析出val生成一个叶子节点shortNode返回。没有结束标志符则为扩展节点,经过decodeRef
解析并生成一个shortNode返回。
func decodeShort(hash, elems []byte, cachegen uint16) (node, error) { kbuf, rest, err := rlp.SplitString(elems) //将elems填入RLP字符串的内容以及字符串后的任何剩余字节。 if err != nil { return nil, err } flag := nodeFlag{hash: hash, gen: cachegen} key := compactToHex(kbuf) if hasTerm(key) { // value node val, _, err := rlp.SplitString(rest) if err != nil { return nil, fmt.Errorf("invalid value node: %v", err) } return &shortNode{key, append(valueNode{}, val...), flag}, nil } r, _, err := decodeRef(rest, cachegen) if err != nil { return nil, wrapError(err, "val") } return &shortNode{key, r, flag}, nil }
继续看下decodeRef
主要作了啥操做:
func decodeRef(buf []byte, cachegen uint16) (node, []byte, error) { kind, val, rest, err := rlp.Split(buf) if err != nil { return nil, buf, err } switch { case kind == rlp.List: // 'embedded' node reference. The encoding must be smaller // than a hash in order to be valid. if size := len(buf) - len(rest); size > hashLen { err := fmt.Errorf("oversized embedded node (size is %d bytes, want size < %d)", size, hashLen) return nil, buf, err } n, err := decodeNode(nil, buf, cachegen) return n, rest, err case kind == rlp.String && len(val) == 0: // empty node return nil, rest, nil case kind == rlp.String && len(val) == 32: return append(hashNode{}, val...), rest, nil default: return nil, nil, fmt.Errorf("invalid RLP string size %d (want 0 or 32)", len(val)) } }
这段代码比较清晰,经过rlp.Split
后返回的类型作不一样的处理,若是是list,调用decodeNode
解析,若是是空节点返回空,若是是一个32位hash值返回hashNode,decodeFull
:
func decodeFull(hash, elems []byte, cachegen uint16) (*fullNode, error) { n := &fullNode{flags: nodeFlag{hash: hash, gen: cachegen}} for i := 0; i < 16; i++ { cld, rest, err := decodeRef(elems, cachegen) if err != nil { return n, wrapError(err, fmt.Sprintf("[%d]", i)) } n.Children[i], elems = cld, rest } val, _, err := rlp.SplitString(elems) if err != nil { return n, err } if len(val) > 0 { n.Children[16] = append(valueNode{}, val...) } return n, nil }
再回到Trie结构体中的cachegen, cachelimit,Trie树每次Commit时cachegen都会+1,这两个参数是cache的控制参数,为了弄清楚Trie的缓存机制,咱们来看下Commit具体是干吗的:
func (t *Trie) Commit(onleaf LeafCallback) (root common.Hash, err error) { if t.db == nil { panic("commit called on trie with nil database") } hash, cached, err := t.hashRoot(t.db, onleaf) if err != nil { return common.Hash{}, err } t.root = cached t.cachegen++ return common.BytesToHash(hash.(hashNode)), nil //返回所指向的node的未编码的hash } //返回trie.root所指向的node的hash以及每一个节点都带有各自hash的trie树的root。 func (t *Trie) hashRoot(db *Database, onleaf LeafCallback) (node, node, error) { if t.root == nil { return hashNode(emptyRoot.Bytes()), nil, nil } h := newHasher(t.cachegen, t.cachelimit, onleaf) defer returnHasherToPool(h) return h.hash(t.root, db, true)//为每一个节点生成一个未编码的hash }
Commit目的,是将trie树中的key转为Compact编码,为每一个节点生成一个hash,它就是为了确保后续能正常将变更的数据提交到db.
那么这个cachegen是怎么放到该节点中的,当trie树在节点插入的时候,会把当前trie的cachegen放入到该节点中,看下trie的insert方法:
//n -> trie当前插入节点 //prefix -> 当前匹配到的key的公共前缀 //key -> 待插入数据当前key中剩余未匹配的部分,完整的key=prefix+key //value -> 待插入数据自己 //返回 -> 是否改变树,插入完成后子树根节点,error func (t *Trie) insert(n node, prefix, key []byte, value node) (bool, node, error) { if len(key) == 0 { if v, ok := n.(valueNode); ok { return !bytes.Equal(v, value.(valueNode)), value, nil } //若是key长度为0,那么说明当前节点中新增长的节点和当前节点数据同样,认为已经新增过了就直接返回 return true, value, nil } switch n := n.(type) { case *shortNode: matchlen := prefixLen(key, n.Key) // 返回公共前缀长度 if matchlen == len(n.Key) { //若是整个key匹配,请按原样保留此节点,并仅更新该值。 dirty, nn, err := t.insert(n.Val, append(prefix, key[:matchlen]...), key[matchlen:], value) if !dirty || err != nil { return false, n, err } return true, &shortNode{n.Key, nn, t.newFlag()}, nil } //不然在它们不一样的索引处分支出来 branch := &fullNode{flags: t.newFlag()} var err error _, branch.Children[n.Key[matchlen]], err = t.insert(nil, append(prefix, n.Key[:matchlen+1]...), n.Key[matchlen+1:], n.Val) if err != nil { return false, nil, err } _, branch.Children[key[matchlen]], err = t.insert(nil, append(prefix, key[:matchlen+1]...), key[matchlen+1:], value) if err != nil { return false, nil, err } //若是它在索引0处出现则用该branch替换shortNode if matchlen == 0 { return true, branch, nil } // Otherwise, replace it with a short node leading up to the branch. return true, &shortNode{key[:matchlen], branch, t.newFlag()}, nil case *fullNode: dirty, nn, err := t.insert(n.Children[key[0]], append(prefix, key[0]), key[1:], value) if !dirty || err != nil { return false, n, err } n = n.copy() n.flags = t.newFlag() n.Children[key[0]] = nn return true, n, nil case nil: //在空trie中添加一个节点,就是叶子节点,返回shortNode。 return true, &shortNode{key, value, t.newFlag()}, nil case hashNode: rn, err := t.resolveHash(n, prefix)//恢复一个存储在db中的node if err != nil { return false, nil, err } dirty, nn, err := t.insert(rn, prefix, key, value) //递归调用 if !dirty || err != nil { return false, rn, err } return true, nn, nil default: panic(fmt.Sprintf("%T: invalid node: %v", n, n)) }
Trie树的插入,这是一个递归调用的方法,从根节点开始,一直往下找,直到找到能够插入的点,进行插入操做。
若是当前的根节点叶子节点shortNode,首先计算公共前缀
接下来看如何遍历Trie树从Trie中获取数据,根据key获取的value过程:
func (t *Trie) TryGet(key []byte) ([]byte, error) { key = keybytesToHex(key) value, newroot, didResolve, err := t.tryGet(t.root, key, 0) if err == nil && didResolve { t.root = newroot } return value, err } func (t *Trie) tryGet(origNode node, key []byte, pos int) (value []byte, newnode node, didResolve bool, err error) { switch n := (origNode).(type) { case nil: // 空树 return nil, nil, false, nil case valueNode: // 就是要查找的叶子节点数据 return n, n, false, nil case *shortNode: if len(key)-pos < len(n.Key) || !bytes.Equal(n.Key, key[pos:pos+len(n.Key)]) { // key在trie中不存在 return nil, n, false, nil } value, newnode, didResolve, err = t.tryGet(n.Val, key, pos+len(n.Key)) if err == nil && didResolve { n = n.copy() n.Val = newnode n.flags.gen = t.cachegen } return value, n, didResolve, err case *fullNode: value, newnode, didResolve, err = t.tryGet(n.Children[key[pos]], key, pos+1) if err == nil && didResolve { n = n.copy() n.flags.gen = t.cachegen n.Children[key[pos]] = newnode } return value, n, didResolve, err case hashNode: // hashNodes时候须要去db中获取 child, err := t.resolveHash(n, key[:pos]) if err != nil { return nil, n, true, err } value, newnode, _, err := t.tryGet(child, key, pos) return value, newnode, true, err default: panic(fmt.Sprintf("%T: invalid node: %v", origNode, origNode)) } }
tryGet(origNode node, key []byte, pos int)
方法提供三个参数,起始的node,hash key,还有当前hash匹配的位置,didResolve
用来判断trie树是否发生变化,根据hashNode去db中获取该node值,获取到后,须要更新现有的trie,didResolve就会发生变化。
关于Trie的Update
和Delete
就不分析了,在trie包中还有其余的功能,咱们来大略看下主要是干吗的不作详细解读了:
转载请注明: 转载自Ryan是菜鸟 | LNMP技术栈笔记
若是以为本篇文章对您十分有益,何不 打赏一下
本文连接地址: 以太坊源码分析--MPT 树