死磕以太坊源码分析之p2p节点发现html
在阅读节点发现源码以前必需要理解kadmilia算法,能够参考:KAD算法详解。node
节点发现,使本地节点得知其余节点的信息,进而加入到p2p网络中。git
以太坊的节点发现基于相似的kademlia算法,源码中有两个版本,v4和v5。v4适用于全节点,经过discover.ListenUDP
使用,v5适用于轻节点经过discv5.ListenUDP
使用,本文介绍的是v4版本。github
节点发现功能主要涉及 Server Table udp 这几个数据结构,它们有独自的事件响应循环,节点发现功能即是它们互相协做完成的。其中,每一个以太坊客户端启动后都会在本地运行一个Server,并将网络拓扑中相邻的节点视为Node,而Table是Node的容器,udp则是负责维持底层的链接。这些结构的关系以下图:算法
在P2p的server.go 的start方法中:数据库
if err := srv.setupDiscovery(); err != nil { return err }
进入到setupDiscovery
中:网络
// Discovery V4 var unhandled chan discover.ReadPacket var sconn *sharedUDPConn if !srv.NoDiscovery { ... ntab, err := discover.ListenUDP(conn, srv.localnode, cfg) .... }
discover.ListenUDP
方法即开启了节点发现的功能.数据结构
首先解析出监听地址的UDP端口,根据端口返回与之相连的UDP链接,以后返回链接的本地网络地址,接着设置最后一个UDP-on-IPv4端口。到此为止节点发现的一些准备工做作好,接下下来开始UDP的监听:并发
ntab, err := discover.ListenUDP(conn, srv.localnode, cfg)
而后进行UDP 的监听,下面是监听的过程:app
// 监听给定的socket 上的发现的包 func ListenUDP(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) { return ListenV4(c, ln, cfg) }
func ListenV4(c UDPConn, ln *enode.LocalNode, cfg Config) (*UDPv4, error) { closeCtx, cancel := context.WithCancel(context.Background()) t := &UDPv4{ conn: c, priv: cfg.PrivateKey, netrestrict: cfg.NetRestrict, localNode: ln, db: ln.Database(), gotreply: make(chan reply), addReplyMatcher: make(chan *replyMatcher), closeCtx: closeCtx, cancelCloseCtx: cancel, log: cfg.Log, } if t.log == nil { t.log = log.Root() } tab, err := newTable(t, ln.Database(), cfg.Bootnodes, t.log) // if err != nil { return nil, err } t.tab = tab go tab.loop() // t.wg.Add(2) go t.loop() // go t.readLoop(cfg.Unhandled) // return t, nil }
主要作了如下几件事:
tab, err := newTable(t, ln.Database(), cfg.Bootnodes, t.log)
新建路由表作了如下几件事:
bootnode
进行链接,全部的节点加入几乎都先链接了它。链接上bootnode
后,获取bootnode
部分的邻居节点,而后进行节点发现,获取更多的活跃的邻居节点首先知道UDP协议是没有链接的概念的,因此须要不断的ping 来测试对端节点是否正常,在新建路由表以后,就来到下面的循环,不断的去作上面的事。
go tab.loop()
定时运行doRefresh
、doRevalidate
、copyLiveNodes
进行刷新K桶。
以太坊的k桶设置:
const ( alpha = 3 // Kademlia并发参数, 是系统内一个优化参数,控制每次从K桶最多取出节点个数,ethereum取值3 bucketSize = 16 // K桶大小(可容纳节点数) maxReplacements = 10 // 每桶更换列表的大小 hashBits = len(common.Hash{}) * 8 //每一个节点ID长度,32*8=256, 32位16进制 nBuckets = hashBits / 15 // K桶个数 )
首先搞清楚这三个定时器运行的时间:
refreshInterval = 30 * time.Minute revalidateInterval = 10 * time.Second copyNodesInterval = 30 * time.Second
doRefresh
doRefresh对随机目标执行查找以保持K桶已满。若是表为空(初始引导程序或丢弃的有故障),则插入种子节点。
主要如下几步:
从数据库加载随机节点和引导节点。这应该会产生一些之前见过的节点
tab.loadSeedNodes()
将本地节点ID做为目标节点进行查找最近的邻居节点
tab.net.lookupSelf()
func (t *UDPv4) lookupSelf() []*enode.Node { return t.newLookup(t.closeCtx, encodePubkey(&t.priv.PublicKey)).run() }
func (t *UDPv4) newLookup(ctx context.Context, targetKey encPubkey) *lookup { ... return t.findnode(n.ID(), n.addr(), targetKey) }) return it }
向这些节点发起findnode
操做查询离target节点最近的节点列表,将查询获得的节点进行ping-pong
测试,将测试经过的节点落库保存
通过这个流程后,节点的K桶就可以比较均匀地将不一样网络节点更新到本地K桶中。
unc (t *UDPv4) findnode(toid enode.ID, toaddr *net.UDPAddr, target encPubkey) ([]*node, error) { t.ensureBond(toid, toaddr) nodes := make([]*node, 0, bucketSize) nreceived := 0 // 设置回应回调函数,等待类型为neighborsPacket的邻近节点包,若是类型对,就执行回调请求 rm := t.pending(toid, toaddr.IP, p_neighborsV4, func(r interface{}) (matched bool, requestDone bool) { reply := r.(*neighborsV4) for _, rn := range reply.Nodes { nreceived++ // 获得一个简单的node结构 n, err := t.nodeFromRPC(toaddr, rn) if err != nil { t.log.Trace("Invalid neighbor node received", "ip", rn.IP, "addr", toaddr, "err", err) continue } nodes = append(nodes, n) } return true, nreceived >= bucketSize }) //上面了一个管道事件,下面开始发送真正的findnode报文,而后进行等待了 t.send(toaddr, toid, &findnodeV4{ Target: target, Expiration: uint64(time.Now().Add(expiration).Unix()), }) return nodes, <-rm.errc }
查找3个随机的目标节点
for i := 0; i < 3; i++ { tab.net.lookupRandom() }
doRevalidate
doRevalidate检查随机存储桶中的最后一个节点是否仍然存在,若是不是,则替换或删除该节点。
主要如下几步:
返回随机的非空K桶中的最后一个节点
last, bi := tab.nodeToRevalidate()
对最后的节点执行Ping操做,而后等待Pong
remoteSeq, err := tab.net.ping(unwrapNode(last))
若是节点ping通了的话,将节点移动到最前面
tab.bumpInBucket(b, last)
没有收到回复,选择一个替换节点,或者若是没有任何替换节点,则删除该节点
tab.replace(b, last)
copyLiveNodes
copyLiveNodes将表中的节点添加到数据库,若是节点在表中的时间超过了5分钟。
这部分代码比较简单,就伸展阐述。
if n.livenessChecks > 0 && now.Sub(n.addedAt) >= seedMinTableTime { tab.db.UpdateNode(unwrapNode(n)) }
go t.loop()
loop循环主要监听如下几类消息:
go t.readLoop(cfg.Unhandled)
主要有如下两件事:
循环接收其余节点发来的udp消息
nbytes, from, err := t.conn.ReadFromUDP(buf)
处理接收到的UDP消息
t.handlePacket(from, buf[:nbytes])
接下来对这两个函数进行进一步的解析。
接收UDP消息比较的简单,就是不断的从链接中读取Packet数据,它有如下几种消息:
ping
:用于判断远程节点是否在线。
pong
:用于回复ping
消息的响应。
findnode
:查找与给定的目标节点相近的节点。
neighbors
:用于回复findnode
的响应,与给定的目标节点相近的节点列表
主要作了如下几件事:
数据包解码
packet, fromKey, hash, err := decodeV4(buf)
检查数据包是否有效,是否能够处理
packet.preverify(t, from, fromID, fromKey)
在校验这一块,涉及不一样的消息类型不一样的校验,咱们来分别对各类消息进行分析。
①:ping
②:pong
③:findNodes
④:neighbors
findnode
的响应,校验回复是否正确处理packet数据
packet.handle(t, from, fromID, hash)
相同的,也会有4种消息,可是咱们这边重点讲处理findNodes的消息:
func (req *findnodeV4) handle(t *UDPv4, from *net.UDPAddr, fromID enode.ID, mac []byte) {
...
}
咱们这里就稍微介绍下如何处理`findnode`的消息: ```go func (req *findnodeV4) handle(t *UDPv4, from *net.UDPAddr, fromID enode.ID, mac []byte) { // 肯定最近的节点 target := enode.ID(crypto.Keccak256Hash(req.Target[:])) t.tab.mutex.Lock() //最接近的返回表中最接近给定id的n个节点 closest := t.tab.closest(target, bucketSize, true).entries t.tab.mutex.Unlock() // 以每一个数据包最多maxNeighbors的块的形式发送邻居,以保持在数据包大小限制如下。 p := neighborsV4{Expiration: uint64(time.Now().Add(expiration).Unix())} var sent bool for _, n := range closest { //扫描这些最近的节点列表,而后一个包一个包的发送给对方 if netutil.CheckRelayIP(from.IP, n.IP()) == nil { p.Nodes = append(p.Nodes, nodeToRPC(n)) } if len(p.Nodes) == maxNeighbors { t.send(from, fromID, &p)//给对方发送 neighborsPacket 包,里面包含节点列表 p.Nodes = p.Nodes[:0] sent = true } } if len(p.Nodes) > 0 || !sent { t.send(from, fromID, &p) } }
首先先肯定最近的节点,再一个包一个包的发给对方,并校验节点的IP,最后把有效的节点发送给请求方。
buckets:全部节点都加到这个里面,按照距离
nursery:启动节点
rand:随机来源
ips:跟踪IP,确保IP中最多N个属于同一网络范围
net: UDP 传输的接口
如下是table的结构图:
http://mindcarver.cn/ ⭐️⭐️⭐️⭐️
https://github.com/blockchainGuide/ ⭐️⭐️⭐️⭐️
http://www.javashuo.com/article/p-tsmvgwqk-nv.html
http://qjpcpu.github.io/blog/2018/01/29/shen-ru-ethereumyuan-ma-p2pmo-kuai-ji-chu-jie-gou/
https://www.jianshu.com/p/b232c870dcd2