做者:freewindreact
比原项目仓库:git
Github地址:https://github.com/Bytom/bytomgithub
Gitee地址:https://gitee.com/BytomBlockc...数组
在上一篇,咱们知道了比原是如何把“请求区块数据”的信息BlockRequestMessage
发送给peer节点的,那么本文研究的重点就是,当peer节点收到了这个信息,它将如何应答?app
那么这个问题若是细分的话,也能够分为三个小问题:函数
BlockRequestMessage
后,将会给对方发送什么样的信息?咱们先从第一个小问题开始。性能
若是咱们在代码中搜索BlockRequestMessage
,会发现只有在ProtocolReactor.Receive
方法中针对该信息进行了应答。那么问题的关键就是,比原是如何接收对方发过来的信息,而且把它转交给ProtocolReactor.Receive
的。区块链
若是咱们对前一篇《比原是如何把请求区块数据的信息发出去的》有印象的话,会记得比原在发送信息时,最后会把信息写入到MConnection.bufWriter
中;与之相应的,MConnection
还有一个bufReader
,用于读取数据,它也是与net.Conn
绑定在一块儿的:atom
p2p/connection.go#L114-L118code
func NewMConnectionWithConfig(conn net.Conn, chDescs []*ChannelDescriptor, onReceive receiveCbFunc, onError errorCbFunc, config *MConnConfig) *MConnection { mconn := &MConnection{ conn: conn, bufReader: bufio.NewReaderSize(conn, minReadBufferSize), bufWriter: bufio.NewWriterSize(conn, minWriteBufferSize),
(其中minReadBufferSize
的值为常量1024
)
因此,要读取对方发来的信息,必定会读取bufReader
。通过简单的搜索,咱们发现,它也是在MConnection.Start
中启动的:
func (c *MConnection) OnStart() error { // ... go c.sendRoutine() go c.recvRoutine() // ... }
其中的c.recvRoutine()
就是咱们本次所关注的。它上面的c.sendRoutine
是用来发送的,是前一篇文章中咱们关注的重点。
继续c.recvRoutine()
:
func (c *MConnection) recvRoutine() { // ... for { c.recvMonitor.Limit(maxMsgPacketTotalSize, atomic.LoadInt64(&c.config.RecvRate), true) // ... pktType := wire.ReadByte(c.bufReader, &n, &err) c.recvMonitor.Update(int(n)) // ... switch pktType { // ... case packetTypeMsg: pkt, n, err := msgPacket{}, int(0), error(nil) wire.ReadBinaryPtr(&pkt, c.bufReader, maxMsgPacketTotalSize, &n, &err) c.recvMonitor.Update(int(n)) // ... channel, ok := c.channelsIdx[pkt.ChannelID] // ... msgBytes, err := channel.recvMsgPacket(pkt) // ... if msgBytes != nil { // ... c.onReceive(pkt.ChannelID, msgBytes) } // ... } } // ... }
通过简化之后,这个方法分红了三块内容:
500K/s
c.bufReader
中读取出下一个数据包的类型。它的值目前有三个,两个跟心跳有关:packetTypePing
和packetTypePong
,另外一个表示是正常的信息数据类型packetTypeMsg
,也是咱们须要关注的c.bufReader
中读取出完整的数据包,而后根据它的ChannelID
找到相应的channel去处理它。ChannelID
有两个值,分别是BlockchainChannel
和PexChannel
,咱们目前只须要关注前者便可,它对应的reactor是ProtocolReactor
。当最后调用c.onReceive(pkt.ChannelID, msgBytes)
时,读取的二进制数据msgBytes
就会被ProtocolReactor.Receive
处理咱们的重点是看第三块内容。首先是channel.recvMsgPacket(pkt)
,即通道是怎么从packet包里读取到相应的二进制数据的呢?
func (ch *Channel) recvMsgPacket(packet msgPacket) ([]byte, error) { // ... ch.recving = append(ch.recving, packet.Bytes...) if packet.EOF == byte(0x01) { msgBytes := ch.recving // ... ch.recving = ch.recving[:0] return msgBytes, nil } return nil, nil }
这个方法我去掉了一些错误检查和关于性能方面的注释,有兴趣的同窗能够点接上方的源代码查看,这里就忽略了。
这段代码主要是利用了一个叫recving
的通道,把packet
中持有的字节数组加到它后面,而后再判断该packet是否表明整个信息结束了,若是是的话,则把ch.recving
的内容完整返回,供调用者处理;不然的话,返回一个nil
,表示还没拿完,暂时处理不了。在前一篇文章中关于发送数据的地方能够与这里对应,只不过发送方要麻烦的多,须要三个通道sendQueue
、sending
和send
才能实现,这边接收方就简单了。
而后回到前面的方法MConnection.recvRoutine
,咱们继续看最后的c.onReceive
调用。这个onReceive
其实是一个由别人赋值给该channel的一个函数,它位于MConnection
建立的地方:
func createMConnection(conn net.Conn, p *Peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, onPeerError func(*Peer, interface{}), config *MConnConfig) *MConnection { onReceive := func(chID byte, msgBytes []byte) { reactor := reactorsByCh[chID] if reactor == nil { if chID == PexChannel { return } else { cmn.PanicSanity(cmn.Fmt("Unknown channel %X", chID)) } } reactor.Receive(chID, p, msgBytes) } onError := func(r interface{}) { onPeerError(p, r) } return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config) }
逻辑也比较简单,就是当前面的c.onReceive(pkt.ChannelID, msgBytes)
调用时,它会根据传入的chID
找到相应的Reactor
,而后执行其Receive
方法。对于本文来讲,就会进入到ProtocolReactor.Receive
。
那咱们继续看ProtocolReactor.Receive
:
netsync/protocol_reactor.go#L179-L247
func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) { _, msg, err := DecodeMessage(msgBytes) // ... switch msg := msg.(type) { case *BlockRequestMessage: // ... }
其中的DecodeMessage(...)
就是把传入的二进制数据反序列化成一个BlockchainMessage
对象,该对象是一个没有任何内容的interface
,它有多种实现类型。咱们在后面继续对该对象进行判断,若是它是BlockRequestMessage
类型的信息,咱们就会继续作相应的处理。处理的代码我在这里暂时省略了,由于它是属于下一个小问题的,咱们先不考虑。
好像不知不觉咱们就把第一个小问题的后半部分差很少搞清楚了。那么前半部分是什么?咱们在前面说,读取bufReader
的代码的起点是在MConnection.Start
中,那么前半部分就是:比原从启动开始中,是在什么状况下怎样一步步走到MConnection.Start
的呢?
好在前半部分的问题咱们在前一篇文章《比原是如何把请求区块数据的信息发出去的》中进行了专门的讨论,这里就不讲了,有须要的话能够再过去看一下(能够先看最后“总结”那一小节)。
下面咱们进入第二个小问题:
BlockRequestMessage
后,将会给对方发送什么样的信息?这里就是接着前面的ProtocolReactor.Receive
继续向下讲了。首先咱们再贴一下它的较完整的代码:
netsync/protocol_reactor.go#L179-L247
func (pr *ProtocolReactor) Receive(chID byte, src *p2p.Peer, msgBytes []byte) { _, msg, err := DecodeMessage(msgBytes) // ... switch msg := msg.(type) { case *BlockRequestMessage: var block *types.Block var err error if msg.Height != 0 { block, err = pr.chain.GetBlockByHeight(msg.Height) } else { block, err = pr.chain.GetBlockByHash(msg.GetHash()) } // ... response, err := NewBlockResponseMessage(block) // ... src.TrySend(BlockchainChannel, struct{ BlockchainMessage }{response}) // ... }
能够看到,逻辑仍是比较简单的,即根据对方发过来的BlockRequestMessage
中指定的height
或者hash
信息,在本地的区块链数据中找到相应的block,组成BlockResponseMessage
发过去就好了。
其中chain.GetBlockByHeight(...)
和chain.GetBlockByHash(...)
若是详细说明的话,须要深入理解区块链数据在比原节点中是如何保存的,咱们在本文先不讲,等到后面专门研究。
在这里,我以为咱们只须要知道咱们会查询区块数据而且构造出一个BlockResponseMessage
,再经过BlockchainChannel
这个通道发送出去就能够了。
最后一句代码中调用了src.TrySend
方法,它是把信息向对方peer发送过去。(其中的src
就是指的对方peer)
那么,它究竟是怎么发送出去的呢?下面咱们进入最后一个小问题:
BlockResponseMessage
信息是如何发送出去的?咱们先看看peer.TrySend
代码:
func (p *Peer) TrySend(chID byte, msg interface{}) bool { if !p.IsRunning() { return false } return p.mconn.TrySend(chID, msg) }
它在内部将会调用MConnection.TrySend
方法,其中chID
是BlockchainChannel
,也就是它对应的Reactor是ProtocolReactor
。
再接着就是咱们熟悉的MConnection.TrySend
,因为它在前一篇文章中进行了全面的讲解,在本文就不提了,若是须要能够过去翻看一下。
那么今天的问题就算是解决啦。
到这里,咱们总算可以完整的理解清楚,当咱们向一个比原节点请求“区块数据”,咱们这边须要怎么作,对方节点又须要怎么作了。