orderer提供broadcast和deliver两个服务接口。orderer节点与各个peer节点经过grpc链接,orderer将全部peer节点经过broadcast发来的交易(Envelope格式,好比peer部署后的数据)按照配置的大小依次封装到一个个block中并写入orderer本身的帐本中,而后供各个peer节点的gossip服务经过deliver来消费这个帐本中的数据进行自身结点帐本的同步。node
先看看main函数。bootstrap
// Main is the entry point of orderer process func Main() { fullCmd := kingpin.MustParse(app.Parse(os.Args[1:])) // "version" command if fullCmd == version.FullCommand() { fmt.Println(metadata.GetVersionInfo()) return } conf, err := localconfig.Load() if err != nil { logger.Error("failed to parse config: ", err) os.Exit(1) } initializeLogging() initializeLocalMsp(conf) prettyPrintStruct(conf) Start(fullCmd, conf) }
从中可知 orderer服务命令行是经过kingpin来实现的,基本上只是简单使用了下,也只实现了3个命令:缓存
start* Start the orderer node version Show version information benchmark Run orderer in benchmark mode
而且从上述main函数可知,仅version有对应操做,而orderer 默认为orderer start。app
启动流程为:ide
接下来主要关注第4步。前面基本上是配置初始化第过程。
查看一下start函数:函数
if clusterType && conf.General.GenesisMethod == "file" { r.replicateIfNeeded(bootstrapBlock) } func (r *Replicator) ReplicateChains() []string { var replicatedChains []string channels := r.discoverChannels() pullHints := r.channelsToPull(channels) totalChannelCount := len(pullHints.channelsToPull) + len(pullHints.channelsNotToPull) for _, channels := range [][]ChannelGenesisBlock{pullHints.channelsToPull, pullHints.channelsNotToPull} { for _, channel := range channels { ... r.appendBlock(gb, ledger, channel.ChannelName) } } for _, channel := range pullHints.channelsToPull { err := r.PullChannel(channel.ChannelName) ... } // Last, pull the system chain. if err := r.PullChannel(r.SystemChannel); err != nil && err != ErrSkipped { r.Logger.Panicf("Failed pulling system channel: %v", err) } return replicatedChains }
// Are we bootstrapping? if len(lf.ChainIDs()) == 0 { initializeBootstrapChannel(genesisBlock, lf) } else { logger.Info("Not bootstrapping because of existing chains") }
registrar := multichannel.NewRegistrar(lf, signer, metricsProvider, callbacks...)
// Registrar serves as a point of access and control for the individual channel resources. type Registrar struct { lock sync.RWMutex //当前全部通道的chain对象 chains map[string]*ChainSupport //不一样共识类型的consenter consenters map[string]consensus.Consenter //Factory经过chainID检索或建立新的分类账 ledgerFactory blockledger.Factory //签名相关 signer crypto.LocalSigner blockcutterMetrics *blockcutter.Metrics //系统链id systemChannelID string //系统链chainSupport systemChannel *ChainSupport //通道配置模版 templator msgprocessor.ChannelConfigTemplator callbacks []channelconfig.BundleActor }
consenters["solo"] = solo.New() var kafkaMetrics *kafka.Metrics consenters["kafka"], kafkaMetrics = kafka.New(conf, metricsProvider, healthChecker, registrar) go kafkaMetrics.PollGoMetricsUntilStop(time.Minute, nil) if isClusterType(bootstrapBlock) { initializeEtcdraftConsenter(consenters, conf, lf, clusterDialer, bootstrapBlock, ri, srvConf, srv, registrar, metricsProvider) }
chain := newChainSupport( r, ledgerResources, r.consenters, r.signer, r.blockcutterMetrics, )
for _, chainID := range existingChains { ... chain.start() ... }
type AtomicBroadcastServer interface { Broadcast(AtomicBroadcast_BroadcastServer) error Deliver(AtomicBroadcast_DeliverServer) error }其接口的实如今:orderer/common/server/server.go
提供支持chain相关操做的资源,既包含帐本自己,也包含了帐本用到的各类工具对象,如分割工具cutter,签名工具signer,最新配置在chain中的位置信息(lastConfig的值表明当前链中最新配置所在的block的编号,lastConfigSeq的值则表明当前链中最新配置消息自身的编号)等工具
type ChainSupport struct { // 帐本相关资源 包括帐本的读写及配置的获取 *ledgerResources // 提供从客户端获取消息分类处理接口 msgprocessor.Processor // 将区块写入磁盘 *BlockWriter // 链 提供对messages对处理方法 //This design allows for two primary flows // 1. Messages are ordered into a stream, the stream is cut into blocks, the blocks are committed (solo, kafka) // 2. Messages are cut into blocks, the blocks are ordered, then the blocks are committed (sbft) consensus.Chain // 广播消息接收器 提供切块方法 cutter blockcutter.Receiver //签名 crypto.LocalSigner // chains need to know if they are system or standard channel. systemChannel bool }
MaxMessageCount指定了block中最多存储的消息数量 AbsoluteMaxBytes指定了block最大的字节数 PreferredMaxBytes指定了一条消息的最优的最大字节数(blockcutter处理消息的过程当中会努力使每一批消息尽可能保持在这个值上)。
主要是对消息进行处理,将交易消息传输给block cutter切成块及写入帐本。不一样的共识机制操做不一样。(后续结合consenter模块一块儿详细介绍).net
chain := newChainSupport( r, ledgerResources, r.consenters, r.signer, r.blockcutterMetrics, ) r.chains[chainID] = chain chain.start()
solo/kafka/etcdraft三种共识类型,用于序列化生产(即各个peer点传送来的Envelope)出来的消息。命令行
参考:
https://blog.csdn.net/idsuf698987/article/details/78639203code