本文主要从源码层面介绍fabric peer同步区块过程,peer同步区块主要有2个过程:
1)peer组织的leader与orderer同步区块
2)peer组织间peer同步区块。html
首先,orderer对外主要是broadcast和deliver两个服务orderer服务介绍。而且咱们知道peer和orderer同步区块确定是deliver服务实现的,可是究竟是peer从orderer拉仍是ordrer推送给peer呢?因为peer能够知道orderer信息(配置块)而且是grpc服务,则推断是peer从orderer拉区块。若是是拉区块,那么peer如何获取区块,获取区块的方式是什么?node
首先,查看orderer deliver服务是怎么运行的,是如何同步区块的。
当deliver服务被调用时,转到Handle()方法处理json
func (h *Handler) Handle(ctx context.Context, srv *Server) error { ... for { logger.Debugf("Attempting to read seek info message from %s", addr) // 接受发来envelope envelope, err := srv.Recv() ... // 分发区块 status, err := h.deliverBlocks(ctx, srv, envelope) ... } }
其中,srv.Recv()接收envelope,在根据envelope信息分发block。app
func (h *Handler) deliverBlocks(ctx context.Context, srv *Server, envelope *cb.Envelope) (status cb.Status, err error) { addr := util.ExtractRemoteAddress(ctx) payload, err := utils.UnmarshalPayload(envelope.Payload) if payload.Header == nil { logger.Warningf("Malformed envelope received from %s with bad header", addr) return cb.Status_BAD_REQUEST, nil } chdr, err := utils.UnmarshalChannelHeader(payload.Header.ChannelHeader) err = h.validateChannelHeader(ctx, chdr) chain := h.ChainManager.GetChain(chdr.ChannelId) defer func() { labels := append(labels, "success", strconv.FormatBool(status == cb.Status_SUCCESS)) h.Metrics.RequestsCompleted.With(labels...).Add(1) }() accessControl, err := NewSessionAC(chain, envelope, srv.PolicyChecker, chdr.ChannelId, crypto.ExpiresAt) if err != nil { logger.Warningf("[channel: %s] failed to create access control object due to %s", chdr.ChannelId, err) return cb.Status_BAD_REQUEST, nil } if err := accessControl.Evaluate(); err != nil { logger.Warningf("[channel: %s] Client authorization revoked for deliver request from %s: %s", chdr.ChannelId, addr, err) return cb.Status_FORBIDDEN, nil } seekInfo := &ab.SeekInfo{} // 返回迭代器及起始区块号 cursor, number := chain.Reader().Iterator(seekInfo.Start) defer cursor.Close() var stopNum uint64 switch stop := seekInfo.Stop.Type.(type) { case *ab.SeekPosition_Oldest: stopNum = number case *ab.SeekPosition_Newest: stopNum = chain.Reader().Height() - 1 case *ab.SeekPosition_Specified: stopNum = stop.Specified.Number if stopNum < number { logger.Warningf("[channel: %s] Received invalid seekInfo message from %s: start number %d greater than stop number %d", chdr.ChannelId, addr, number, stopNum) return cb.Status_BAD_REQUEST, nil } } for { if seekInfo.Behavior == ab.SeekInfo_FAIL_IF_NOT_READY { if number > chain.Reader().Height()-1 { return cb.Status_NOT_FOUND, nil } } var block *cb.Block var status cb.Status iterCh := make(chan struct{}) go func() { // 获取区块 block, status = cursor.Next() close(iterCh) }() select { case <-ctx.Done(): logger.Debugf("Context canceled, aborting wait for next block") return cb.Status_INTERNAL_SERVER_ERROR, errors.Wrapf(ctx.Err(), "context finished before block retrieved") case <-erroredChan: // TODO, today, the only user of the errorChan is the orderer consensus implementations. If the peer ever reports // this error, we will need to update this error message, possibly finding a way to signal what error text to return. logger.Warningf("Aborting deliver for request because the backing consensus implementation indicates an error") return cb.Status_SERVICE_UNAVAILABLE, nil case <-iterCh: // Iterator has set the block and status vars } if status != cb.Status_SUCCESS { logger.Errorf("[channel: %s] Error reading from channel, cause was: %v", chdr.ChannelId, status) return status, nil } // increment block number to support FAIL_IF_NOT_READY deliver behavior number++ if err := accessControl.Evaluate(); err != nil { logger.Warningf("[channel: %s] Client authorization revoked for deliver request from %s: %s", chdr.ChannelId, addr, err) return cb.Status_FORBIDDEN, nil } logger.Debugf("[channel: %s] Delivering block for (%p) for %s", chdr.ChannelId, seekInfo, addr) // 发送区块 if err := srv.SendBlockResponse(block); err != nil { logger.Warningf("[channel: %s] Error sending to %s: %s", chdr.ChannelId, addr, err) return cb.Status_INTERNAL_SERVER_ERROR, err } h.Metrics.BlocksSent.With(labels...).Add(1) // 若是到了client请求对最后区块跳出循环 if stopNum == block.Header.Number { break } } logger.Debugf("[channel: %s] Done delivering to %s for (%p)", chdr.ChannelId, addr, seekInfo) return cb.Status_SUCCESS, nil }
// Chain encapsulates chain operations and data. type Chain interface { // Sequence returns the current config sequence number, can be used to detect config changes Sequence() uint64 // PolicyManager returns the current policy manager as specified by the chain configuration PolicyManager() policies.Manager // Reader returns the chain Reader for the chain Reader() blockledger.Reader // Errored returns a channel which closes when the backing consenter has errored Errored() <-chan struct{} }
type SeekInfo struct { Start *SeekPosition `protobuf:"bytes,1,opt,name=start,proto3" json:"start,omitempty"` Stop *SeekPosition `protobuf:"bytes,2,opt,name=stop,proto3" json:"stop,omitempty"` Behavior SeekInfo_SeekBehavior `protobuf:"varint,3,opt,name=behavior,proto3,enum=orderer.SeekInfo_SeekBehavior" json:"behavior,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
这里主要解决1个问题:peer如何触发orderer deliver服务?即peer和orderer怎么同步区块的?
在介绍以前参阅peer节点启动流程。在peer节点启动过程当中会执行peer.Initialize()方法,对peer所在的全部chain实例化。其中调用了createChain()接口建立链对象。在createChain()方法中调用了GossipService.InitializeChannel()方法。而后调用g.deliveryService[chainID].StartDeliverForChannel()方法获取区块。ide
func (d *deliverServiceImpl) StartDeliverForChannel(chainID string, ledgerInfo blocksprovider.LedgerInfo, finalizer func()) error { d.lock.Lock() defer d.lock.Unlock() if d.stopping { errMsg := fmt.Sprintf("Delivery service is stopping cannot join a new channel %s", chainID) logger.Errorf(errMsg) return errors.New(errMsg) } if _, exist := d.blockProviders[chainID]; exist { errMsg := fmt.Sprintf("Delivery service - block provider already exists for %s found, can't start delivery", chainID) logger.Errorf(errMsg) return errors.New(errMsg) } else { client := d.newClient(chainID, ledgerInfo) logger.Debug("This peer will pass blocks from orderer service to other peers for channel", chainID) // 建立区块deliver实例 d.blockProviders[chainID] = blocksprovider.NewBlocksProvider(chainID, client, d.conf.Gossip, d.conf.CryptoSvc) // 执行 go d.launchBlockProvider(chainID, finalizer) } return nil }
其中newClient()建立一个broadcastClient,传入参数为requester.RequestBlocks(ledgerInfoProvider)方法。很显然,peer是经过该方法获取区块的,那么该方法主要实现是什么?源码分析
func (b *blocksRequester) RequestBlocks(ledgerInfoProvider blocksprovider.LedgerInfo) error { height, err := ledgerInfoProvider.LedgerHeight() if err != nil { logger.Errorf("Can't get ledger height for channel %s from committer [%s]", b.chainID, err) return err } if height > 0 { logger.Debugf("Starting deliver with block [%d] for channel %s", height, b.chainID) if err := b.seekLatestFromCommitter(height); err != nil { return err } } else { logger.Debugf("Starting deliver with oldest block for channel %s", b.chainID) if err := b.seekOldest(); err != nil { return err } } return nil }
调用了seek_XXX方法,其中ui
type SeekInfo struct { Start *SeekPosition `protobuf:"bytes,1,opt,name=start,proto3" json:"start,omitempty"` Stop *SeekPosition `protobuf:"bytes,2,opt,name=stop,proto3" json:"stop,omitempty"` Behavior SeekInfo_SeekBehavior `protobuf:"varint,3,opt,name=behavior,proto3,enum=orderer.SeekInfo_SeekBehavior" json:"behavior,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
func (b *blocksRequester) seekOldest() error { seekInfo := &orderer.SeekInfo{ Start: &orderer.SeekPosition{Type: &orderer.SeekPosition_Oldest{Oldest: &orderer.SeekOldest{}}}, Stop: &orderer.SeekPosition{Type: &orderer.SeekPosition_Specified{Specified: &orderer.SeekSpecified{Number: math.MaxUint64}}}, Behavior: orderer.SeekInfo_BLOCK_UNTIL_READY, } //TODO- epoch and msgVersion may need to be obtained for nowfollowing usage in orderer/configupdate/configupdate.go msgVersion := int32(0) epoch := uint64(0) tlsCertHash := b.getTLSCertHash() env, err := utils.CreateSignedEnvelopeWithTLSBinding(common.HeaderType_DELIVER_SEEK_INFO, b.chainID, localmsp.NewSigner(), seekInfo, msgVersion, epoch, tlsCertHash) if err != nil { return err } // 发送envelope给orderer获取区块 return b.client.Send(env) }
从其中RequestBlocks()调用的2个方法可知,seekInfo的stopNum都为math.MaxUint64,则该方法会持续请求区块知道最大值(能够看做如今到将来的全部区块)。this
上文可知,broadcastClient已经实例化,而且经过调用broadcastClient.onConnect向orderer发送获取区块的envelope。在实例化后,调用launchBlockProvider。而后会调用 pb.DeliverBlocks()方法(开始获取区块)。lua
type broadcastClient struct { stopFlag int32 stopChan chan struct{} createClient clientFactory shouldRetry retryPolicy onConnect broadcastSetup prod comm.ConnectionProducer mutex sync.Mutex blocksDeliverer blocksprovider.BlocksDeliverer conn *connection endpoint string }
// DeliverBlocks used to pull out blocks from the ordering service to // distributed them across peers func (b *blocksProviderImpl) DeliverBlocks() { errorStatusCounter := 0 statusCounter := 0 defer b.client.Close() for !b.isDone() { // 接收orderer分发的区块 msg, err := b.client.Recv() if err != nil { logger.Warningf("[%s] Receive error: %s", b.chainID, err.Error()) return } switch t := msg.Type.(type) { case *orderer.DeliverResponse_Status: if t.Status == common.Status_SUCCESS { logger.Warningf("[%s] ERROR! Received success for a seek that should never complete", b.chainID) return } if t.Status == common.Status_BAD_REQUEST || t.Status == common.Status_FORBIDDEN { logger.Errorf("[%s] Got error %v", b.chainID, t) errorStatusCounter++ if errorStatusCounter > b.wrongStatusThreshold { logger.Criticalf("[%s] Wrong statuses threshold passed, stopping block provider", b.chainID) return } } else { errorStatusCounter = 0 logger.Warningf("[%s] Got error %v", b.chainID, t) } maxDelay := float64(maxRetryDelay) currDelay := float64(time.Duration(math.Pow(2, float64(statusCounter))) * 100 * time.Millisecond) time.Sleep(time.Duration(math.Min(maxDelay, currDelay))) if currDelay < maxDelay { statusCounter++ } if t.Status == common.Status_BAD_REQUEST { b.client.Disconnect(false) } else { b.client.Disconnect(true) } continue case *orderer.DeliverResponse_Block: errorStatusCounter = 0 statusCounter = 0 blockNum := t.Block.Header.Number marshaledBlock, err := proto.Marshal(t.Block) if err != nil { logger.Errorf("[%s] Error serializing block with sequence number %d, due to %s", b.chainID, blockNum, err) continue } if err := b.mcs.VerifyBlock(gossipcommon.ChainID(b.chainID), blockNum, marshaledBlock); err != nil { logger.Errorf("[%s] Error verifying block with sequnce number %d, due to %s", b.chainID, blockNum, err) continue } numberOfPeers := len(b.gossip.PeersOfChannel(gossipcommon.ChainID(b.chainID))) // Create payload with a block received payload := createPayload(blockNum, marshaledBlock) // Use payload to create gossip message gossipMsg := createGossipMsg(b.chainID, payload) logger.Debugf("[%s] Adding payload to local buffer, blockNum = [%d]", b.chainID, blockNum) // Add payload to local state payloads buffer if err := b.gossip.AddPayload(b.chainID, payload); err != nil { logger.Warningf("Block [%d] received from ordering service wasn't added to payload buffer: %v", blockNum, err) } // Gossip messages with other nodes logger.Debugf("[%s] Gossiping block [%d], peers number [%d]", b.chainID, blockNum, numberOfPeers) if !b.isDone() { // peer节点间经过gossip同步区块 b.gossip.Gossip(gossipMsg) } default: logger.Warningf("[%s] Received unknown: %v", b.chainID, t) return } } }
DeliverBlocks()方法介绍,首先调用 b.client.Recv()接收orderer传过来的响应,code
// Recv receives a message from the ordering service func (bc *broadcastClient) Recv() (*orderer.DeliverResponse, error) { o, err := bc.try(func() (interface{}, error) { if bc.shouldStop() { return nil, errors.New("closing") } return bc.tryReceive() }) if err != nil { return nil, err } return o.(*orderer.DeliverResponse), nil }
这里咱们知道大体是peer从orderer这里拉区块的,可是还存在疑问,那就是peer如何触发orderer的deliver服务的?peer是如何调用requestBlock方法的?
// broadcastSetup is a function that is called by the broadcastClient immediately after each // successful connection to the ordering service
peer间同步区块是经过gossip服务来同步的,而且经过上述代码可知,leader和orderer同步区块也是伴随着gossip服务启动(不过是属于leader的)。
// Gossip messages with other nodes logger.Debugf("[%s] Gossiping block [%d], peers number [%d]", b.chainID, blockNum, numberOfPeers) if !b.isDone() { // peer节点间经过gossip同步区块 b.gossip.Gossip(gossipMsg) }
首先,其余peer是经过gossip服务同步区块,则保存区块应该是在gossip服务里面调用的,回到peer启动时gossip服务的设置
service.GetGossipService().InitializeChannel(bundle.ConfigtxValidator().ChainID(), ordererAddresses, service.Support{ Validator: validator, Committer: c, Store: store, Cs: simpleCollectionStore, IdDeserializeFactory: csStoreSupport, })
里面会调用
g.chains[chainID] = state.NewGossipStateProvider(chainID, servicesAdapter, coordinator, g.metrics.StateMetrics, getStateConfiguration())
里面会调用
// Listen for incoming communication go s.listen() // Deliver in order messages into the incoming channel go s.deliverPayloads()
deliverPayloads()会将gossip.payload 区块给写入帐本。
func (s *GossipStateProviderImpl) deliverPayloads() { defer s.done.Done() for { select { // Wait for notification that next seq has arrived case <-s.payloads.Ready(): logger.Debugf("[%s] Ready to transfer payloads (blocks) to the ledger, next block number is = [%d]", s.chainID, s.payloads.Next()) // Collect all subsequent payloads for payload := s.payloads.Pop(); payload != nil; payload = s.payloads.Pop() { rawBlock := &common.Block{} if err := pb.Unmarshal(payload.Data, rawBlock); err != nil { logger.Errorf("Error getting block with seqNum = %d due to (%+v)...dropping block", payload.SeqNum, errors.WithStack(err)) continue } if rawBlock.Data == nil || rawBlock.Header == nil { logger.Errorf("Block with claimed sequence %d has no header (%v) or data (%v)", payload.SeqNum, rawBlock.Header, rawBlock.Data) continue } logger.Debugf("[%s] Transferring block [%d] with %d transaction(s) to the ledger", s.chainID, payload.SeqNum, len(rawBlock.Data.Data)) // Read all private data into slice var p util.PvtDataCollections if payload.PrivateData != nil { err := p.Unmarshal(payload.PrivateData) if err != nil { logger.Errorf("Wasn't able to unmarshal private data for block seqNum = %d due to (%+v)...dropping block", payload.SeqNum, errors.WithStack(err)) continue } } // 此处会保存区块 if err := s.commitBlock(rawBlock, p); err != nil { if executionErr, isExecutionErr := err.(*vsccErrors.VSCCExecutionFailureError); isExecutionErr { logger.Errorf("Failed executing VSCC due to %v. Aborting chain processing", executionErr) return } logger.Panicf("Cannot commit block to the ledger due to %+v", errors.WithStack(err)) } } case <-s.stopCh: s.stopCh <- struct{}{} logger.Debug("State provider has been stopped, finishing to push new blocks.") return } } }