做者:freewindnode
比原项目仓库:react
Github地址:https://github.com/Bytom/bytomgit
Gitee地址:https://gitee.com/BytomBlockchain/bytomgithub
在前一篇中,咱们说到,当比原向其它节点请求区块数据时,BlockKeeper
会发送一个BlockRequestMessage
把须要的区块height
告诉对方,并把该信息对应的二进制数据放入ProtocolReactor
对应的sendQueue
通道中,等待发送。而具体的发送细节,因为逻辑比较复杂,因此在前一篇中并未详解,放到本篇中。json
因为sendQueue
是一个通道,数据放进去后,究竟是由谁在什么状况下取走并发送,BlockKeeper
这边是不知道的。通过咱们在代码中搜索,发现只有一个类型会直接监视sendQueue
中的数据,它就是前文出现的MConnection
。MConnection
的对象在它的OnStart
方法中,会监视sendQueue
中的数据,而后,等发现数据时,会将之取走并放入一个叫sending
的通道里。缓存
事情变得有点复杂了:并发
MConnection
对应了一个与peer的链接,而比原节点之间创建链接的状况又有多种:好比主动链接别的节点,或者别的节点主动连上我sending
以后,咱们还须要知道又是谁在什么状况下会监视sending
,取走它里面的数据sending
中的数据被取走后,又是如何被发送到其它节点的呢?仍是像之前同样,遇到复杂的问题,咱们先经过“相互独立,彻底穷尽”的原则,把它分解成一个个小问题,而后依次解决。atom
那么首先咱们须要弄清楚的是:spa
MConnection
的对象并调用其OnStart
方法?(从而咱们知道sendQueue
中的数据是如何被监视的).net
通过分析,咱们发现MConnection
的启动,只出如今一个地方,即Peer
的OnStart
方法中。那么就这个问题就变成了:比原在什么状况下,会建立Peer
的对象并调用其OnStart
方法?
再通过一番折腾,终于肯定,在比原中,在下列4种状况Peer.OnStart
方法最终会被调用:
addrbook.json
中保存的节点的时候PEXReactor
,并使用它本身的协议与当前链接上的节点进行通讯的时候Switch.Connect2Switches
方法中(可忽略)第4种状况咱们彻底忽略。第3种状况中,因为PEXReactor
会使用相似于BitTorrent的文件分享协议与其它节点分享数据,逻辑比较独立,算是一种辅助做用,咱们也暂不考虑。这样咱们就只须要分析前两种状况了。
MConnection.OnStart
方法的?首先咱们快速走到SyncManager.Start
方法:
func main() { cmd := cli.PrepareBaseCmd(commands.RootCmd, "TM", os.ExpandEnv(config.DefaultDataDir())) cmd.Execute() }
cmd/bytomd/commands/run_node.go#L41
func runNode(cmd *cobra.Command, args []string) error { n := node.NewNode(config) if _, err := n.Start(); err != nil { // ... }
func (n *Node) OnStart() error { // ... n.syncManager.Start() // ... }
func (sm *SyncManager) Start() { go sm.netStart() // ... }
而后咱们将进入netStart()
方法。在这个方法中,比原将主动链接其它节点:
func (sm *SyncManager) netStart() error { // ... if sm.config.P2P.Seeds != "" { // dial out seeds := strings.Split(sm.config.P2P.Seeds, ",") if err := sm.DialSeeds(seeds); err != nil { return err } } return nil }
这里出现的sm.config.P2P.Seeds
,对应的就是本地数据目录中config.toml
中的p2p.seeds
中的种子结点。
接着经过sm.DialSeeds
去主动链接每一个种子:
func (sm *SyncManager) DialSeeds(seeds []string) error { return sm.sw.DialSeeds(sm.addrBook, seeds) }
func (sw *Switch) DialSeeds(addrBook *AddrBook, seeds []string) error { // ... for i := 0; i < len(perm)/2; i++ { j := perm[i] sw.dialSeed(netAddrs[j]) } // ... }
func (sw *Switch) dialSeed(addr *NetAddress) { peer, err := sw.DialPeerWithAddress(addr, false) // ... }
func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (*Peer, error) { // ... peer, err := newOutboundPeerWithConfig(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, sw.peerConfig) // ... err = sw.AddPeer(peer) // ... }
先是经过newOutboundPeerWithConfig
建立了peer
,而后把它加入到sw
(即Switch
对象)中。
func (sw *Switch) AddPeer(peer *Peer) error { // ... // Start peer if sw.IsRunning() { if err := sw.startInitPeer(peer); err != nil { return err } } // ... }
在sw.startInitPeer
中,将会调用peer.Start
:
func (sw *Switch) startInitPeer(peer *Peer) error { peer.Start() // ... }
而peer.Start
对应了Peer.OnStart
,最后就是:
func (p *Peer) OnStart() error { p.BaseService.OnStart() _, err := p.mconn.Start() return err }
能够看到,在这里调用了mconn.Start
,终于找到了。总结一下就是:
Node.Start
-> SyncManager.Start
-> SyncManager.netStart
-> Switch.DialSeeds
-> Switch.AddPeer
-> Switch.startInitPeer
-> Peer.OnStart
-> MConnection.OnStart
那么,第一种主动链接别的节点的状况就到这里分析完了。下面是第二种状况:
MConnection.OnStart
方法这一步的?比原节点启动后,会监听本地的p2p端口,等待别的节点链接上来。那么这个流程又是什么样的呢?
因为比原节点的启动流程在目前的文章中已经屡次出现,这里就不贴了,咱们直接从Switch.OnStart
开始(它是在SyncManager
启动的时候启动的):
func (sw *Switch) OnStart() error { // ... for _, peer := range sw.peers.List() { sw.startInitPeer(peer) } // Start listeners for _, listener := range sw.listeners { go sw.listenerRoutine(listener) } // ... }
这个方法通过省略之后,还剩两块代码,一块是startInitPeer(...)
,一块是sw.listenerRoutine(listener)
。
若是你刚才在读前一节时留意了,就会发现,startInitPeer(...)
方法立刻就会调用Peer.Start
。然而在这里须要说明的是,通过个人分析,发现这块代码实际上没有起到任何做用,由于在当前这个时刻,sw.peers
老是空的,它里面尚未来得及被其它的代码添加进peer。因此我以为它能够删掉,以避免误导读者。(提了一个issue,参见#902)
第二块代码,listenerRoutine
,若是你还有印象的话,它就是用来监听本地p2p端口的,在前面“比原是如何监听p2p端口的”一文中有详细的讲解。
咱们今天仍是须要再挖掘一下它,看看它究竟是怎么走到MConnection.OnStart
的:
func (sw *Switch) listenerRoutine(l Listener) { for { inConn, ok := <-l.Connections() // ... err := sw.addPeerWithConnectionAndConfig(inConn, sw.peerConfig) // ... } }
这里的l
就是监听本地p2p端口的Listener。经过一个for
循环,拿到链接到该端口的节点的链接,生成新peer。
func (sw *Switch) addPeerWithConnectionAndConfig(conn net.Conn, config *PeerConfig) error { // ... peer, err := newInboundPeerWithConfig(conn, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodePrivKey, config) // ... if err = sw.AddPeer(peer); err != nil { // ... } // ... }
生成新的peer以后,调用了Switch
的AddPeer
方法。到了这里,就跟前一节同样了,在AddPeer
中将调用sw.startInitPeer(peer)
,而后调用peer.Start()
,最后调用了MConnection.OnStart()
。因为代码如出一辙,就不贴出来了。
总结一下,就是:
Node.Start
-> SyncManager.Start
-> SyncManager.netStart
-> Switch.OnStart
-> Switch.listenerRoutine
-> Switch.addPeerWithConnectionAndConfig
-> Switch.AddPeer
-> Switch.startInitPeer
-> Peer.OnStart
-> MConnection.OnStart
那么,第二种状况咱们也分析完了。
不过到目前为止,咱们只解决了此次问题中的第一个小问题,即:咱们终于知道了比原代码会在什么状况来启动一个MConnection
,从而监视sendQueue
通道,把要发送的信息数据,转到了sending
通道中。
那么,咱们进入下一个小问题:
sending
以后,谁又会来取走它们呢?通过分析以后,发现通道sendQueue
和sending
都属于类型Channel
,只不过二者做用不一样。sendQueue
是用来存放待发送的完整的信息数据,而sending
更底层一些,它持有的数据可能会被分红多个块发送。若是只有sendQueue
一个通道,那么很难实现分块的操做的。
而Channel
的发送是由MConnection
来调用的,幸运的是,当咱们一直往回追溯下去,发现竟走到了MConnection.OnStart
这里。也就是说,咱们在这个小问题中,研究的正好是前面两个链条后面的部分:
Node.Start
-> SyncManager.Start
-> SyncManager.netStart
-> Switch.DialSeeds
-> Switch.AddPeer
-> Switch.startInitPeer
-> Peer.OnStart
-> MConnection.OnStart
-> ???
Node.Start
-> SyncManager.Start
-> SyncManager.netStart
-> Switch.OnStart
-> Switch.listenerRoutine
-> Switch.addPeerWithConnectionAndConfig
-> Switch.AddPeer
-> Switch.startInitPeer
-> Peer.OnStart
-> MConnection.OnStart
-> ???
也就是上面的???
部分。
那么咱们就直接从MConnection.OnStart
开始:
func (c *MConnection) OnStart() error { // ... go c.sendRoutine() // ... }
c.sendRoutine()
方法就是咱们须要的。当MConnection
启动之后,就会开始进行发送操做(等待数据到来)。它的代码以下:
func (c *MConnection) sendRoutine() { // ... case <-c.send: // Send some msgPackets eof := c.sendSomeMsgPackets() if !eof { // Keep sendRoutine awake. select { case c.send <- struct{}{}: default: } } } // ... }
这个方法原本很长,只是咱们省略掉了不少无关的代码。里面的c.sendSomeMsgPackets()
就是咱们要找的,可是,咱们忽然发现,怎么又出来了一个c.send
通道?它又有什么用?并且看起来好像只有当这个通道里有东西的时候,咱们才会去调用c.sendSomeMsgPackets()
,彷佛像是一个铃铛同样用来提醒咱们。
那么c.send
何时会有东西呢?检查了代码以后,发如今如下3个地方:
func (c *MConnection) Send(chID byte, msg interface{}) bool { // ... success := channel.sendBytes(wire.BinaryBytes(msg)) if success { // Wake up sendRoutine if necessary select { case c.send <- struct{}{}: // .. }
func (c *MConnection) TrySend(chID byte, msg interface{}) bool { // ... ok = channel.trySendBytes(wire.BinaryBytes(msg)) if ok { // Wake up sendRoutine if necessary select { case c.send <- struct{}{}: // ... }
func (c *MConnection) sendRoutine() { // .... case <-c.send: // Send some msgPackets eof := c.sendSomeMsgPackets() if !eof { // Keep sendRoutine awake. select { case c.send <- struct{}{}: // ... }
若是咱们对前一篇文章还有印象,就会记得channel.trySendBytes
是在咱们想给对方节点发信息时调用的,调用完之后,它会把信息对应的二进制数据放入到channel.sendQueue
通道(因此才有了本文)。channel.sendBytes
咱们目前虽然还没用到,可是它也应该是相似的。在它们两个调用完以后,它们都会向c.send
通道里放入一个数据,用来通知Channel
有数据能够发送了。
而第三个sendRoutine()
就是咱们刚刚走到的地方。当咱们调用c.sendSomeMsgPackets()
发送了sending
中的一部分以后,若是还有剩余的,则继续向c.send
放个数据,提醒能够继续发送。
那到目前为止,发送数据涉及到的Channel就有三个了,分别是sendQueue
、sending
和send
。之因此这么复杂,根本缘由就是想把数据分块发送。
为何要分块发送呢?这是由于比原但愿能控制发送速率,让节点之间的网速能保持在一个合理的水平。若是不限制的话,一会儿发出大量的数据,一是可能会让接收者来不及处理,二是有可能会被恶意节点利用,请求大量区块数据把带宽占满。
担忧sendQueue
、sending
和send
这三个通道不太好理解,我想到了一个“烧鸭店”的比喻,来理解它们:
sendQueue
就像是用来挂烤好的烧鸭的勾子,能够有多个(但对于比原来讲,默认只有一个,由于sendQueue
的容量默认为1
),当有烧鸭烤好之后,就挂在勾子上;sending
是砧板,能够把烧鸭从sendQueue
勾子上取下来一只,放在上面切成块,等待装盘,一只烧鸭可能能够装成好几盘;send
是铃铛,当有人点单后,服务员就会按一下铃铛,厨师就从sending
砧板上拿几块烧鸭放在小盘中放在出餐口。因为厨师很是忙,每次切出一盘后均可能会去作别的事情,而忘了sending
砧板上还有烧鸭没装盘,因此为了防止本身忘记,他每切出一盘以后,都会看一眼sending
砧板,若是还有肉,就会按一下铃铛提醒本身继续装盘。好了,理解了send
后,咱们就能够回到主线,继续看c.sendSomeMsgPackets()
的代码了:
func (c *MConnection) sendSomeMsgPackets() bool { // Block until .sendMonitor says we can write. // Once we're ready we send more than we asked for, // but amortized it should even out. c.sendMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.SendRate), true) // Now send some msgPackets. for i := 0; i < numBatchMsgPackets; i++ { if c.sendMsgPacket() { return true } } return false }
c.sendMonitor.Limit
的做用是限制发送速率,其中maxMsgPacketTotalSize
即每一个packet的最大长度为常量10240
,第二个参数是预先指定的发送速率,默认值为500KB/s
,第三个参数是说,当实际速度过大时,是否暂停发送,直到变得正常。
通过限速的调整后,后面一段就能够正常发送数据了,其中的c.sendMsgPacket
是咱们继续要看的方法:
func (c *MConnection) sendMsgPacket() bool { // ... n, err := leastChannel.writeMsgPacketTo(c.bufWriter) // .. c.sendMonitor.Update(int(n)) // ... return false }
这个方法最前面我省略了一大段代码,其做用是检查多个channel,结合它们的优先级和已经发的数据量,找到当前最须要发送数据的那个channel,记为leastChannel
。
而后就是调用leastChannel.writeMsgPacketTo(c.bufWriter)
,把当前要发送的一块数据,写到bufWriter
中。这个bufWriter
就是真正与链接对象绑定的一个缓存区,写入到它里面的数据,会被Go发送出去。它的定义是在建立MConnection
的地方:
func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection { mconn := &MConnection{ conn: conn, bufReader: bufio.NewReaderSize(conn, minReadBufferSize), bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
其中minReadBufferSize
为1024
,minWriteBufferSize
为65536
。
数据写到bufWriter
之后,咱们就不须要关心了,交给Go来操做了。
在leastChannel.writeMsgPacketTo(c.bufWriter)
调用完之后,后面会更新c.sendMonitor
,这样它才能继续正确的限速。
这时咱们已经知道数据是怎么发出去的了,可是咱们尚未找到是谁在监视sending
里的数据,那让咱们继续看leastChannel.writeMsgPacketTo
:
func (ch *Channel) writeMsgPacketTo(w io.Writer) (n int, err error) { packet := ch.nextMsgPacket() wire.WriteByte(packetTypeMsg, w, &n, &err) wire.WriteBinary(packet, w, &n, &err) if err == nil { ch.recentlySent += int64(n) } return }
其中的ch.nextMsgPacket()
是取出下一个要发送的数据块,那么是从哪里取出呢?是从sending
吗?
其后的代码是把数据块对象变成二进制,放入到前面的bufWriter
中发送。
继续ch.nextMsgPacket()
:
func (ch *Channel) nextMsgPacket() msgPacket { packet := msgPacket{} packet.ChannelID = byte(ch.id) packet.Bytes = ch.sending[:cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending))] if len(ch.sending) <= maxMsgPacketPayloadSize { packet.EOF = byte(0x01) ch.sending = nil atomic.AddInt32(&ch.sendQueueSize, -1) // decrement sendQueueSize } else { packet.EOF = byte(0x00) ch.sending = ch.sending[cmn.MinInt(maxMsgPacketPayloadSize, len(ch.sending)):] } return packet }
终于看到sending
了。从这里能够看出,sending
的确是放着不少块鸭肉的砧板,而packet
就是一个小盘,因此须要从先sending
中拿出不超过指定长度的数据放到packet
中,而后判断sending
里还有没有剩下的。若是有,则packet
的EOF
值为0x00
,不然为0x01
,这样调用者就知道数据有没有发完,还需不须要去按那个叫send
的铃。
那么到这里为止,咱们就知道原来仍是Channel本身在关注sending
,而且为了限制发送速度,须要把它切成一个个小块。
最后就咱们的第三个小问题了,其实咱们刚才在第二问里已经弄清楚了。
sending
中的数据被取走后,又是如何被发送到其它节点的呢?答案就是,sending
中的数据被分红一块块取出来后,会放入到bufWriter
中,就直接被Go的net.Conn
对象发送出去了。到这一层面,就不须要咱们再继续深刻了。
因为本篇中涉及的方法调用比较多,可能看完都乱了,因此在最后,咱们前面调用链补充完整,放在最后:
Node.Start
-> SyncManager.Start
-> SyncManager.netStart
-> Switch.DialSeeds
-> Switch.AddPeer
-> Switch.startInitPeer
-> Peer.OnStart
-> MConnection.OnStart
-> ...
Node.Start
-> SyncManager.Start
-> SyncManager.netStart
-> Switch.OnStart
-> Switch.listenerRoutine
-> Switch.addPeerWithConnectionAndConfig
-> Switch.AddPeer
-> Switch.startInitPeer
-> Peer.OnStart
-> MConnection.OnStart
-> ...
而后是:
MConnection.sendRoutine
-> MConnection.send
-> MConnection.sendSomeMsgPackets
-> MConnection.sendMsgPacket
-> MConnection.writeMsgPacketTo
-> MConnection.nextMsgPacket
-> MConnection.sending
到了最后,个人感受就是,一个复杂问题最开始看起来很可怕,可是一旦把它分解成小问题以后,每次只关注一个,各个击破,好像就没那么复杂了。