peer模块采用cobra库来实现cli命令。html
Cobra提供简单的接口来建立强大的现代化CLI接口,好比git与go工具。Cobra同时也是一个程序, 用于建立CLI程序java
peer支持的命令以下所示:node
Usage: peer [command] Available Commands: chaincode Operate a chaincode: install|instantiate|invoke|package|query|signpackage|upgrade|list. channel Operate a channel: create|fetch|join|list|update|signconfigtx|getinfo. help Help about any command logging Log levels: getlevel|setlevel|revertlevels. node Operate a peer node: start|status. version Print fabric peer version. Flags: -h, --help help for peer --logging-level string Default logging level and overrides, see core.yaml for full syntax
经过peer 的docker-compose文件可知,peer启动命令为peer node start。从下列代码可知,peer启动时调用serve()接口。ios
var nodeStartCmd = &cobra.Command{ Use: "start", Short: "Starts the node.", Long: `Starts a node that interacts with the network.`, RunE: func(cmd *cobra.Command, args []string) error { if len(args) != 0 { return fmt.Errorf("trailing args detected") } // Parsing of the command line is done so silence cmd usage cmd.SilenceUsage = true return serve(args) }, }
接下来深刻分析serve()接口。git
func serve(args []string) error { // currently the peer only works with the standard MSP // because in certain scenarios the MSP has to make sure // that from a single credential you only have a single 'identity'. // Idemix does not support this *YET* but it can be easily // fixed to support it. For now, we just make sure that // the peer only comes up with the standard MSP // 当前peer启动时只支持标准MSP即Fabric。 mspType := mgmt.GetLocalMSP().GetType() if mspType != msp.FABRIC { panic("Unsupported msp type " + msp.ProviderTypeToString(mspType)) } // Trace RPCs with the golang.org/x/net/trace package. This was moved out of // the deliver service connection factory as it has process wide implications // and was racy with respect to initialization of gRPC clients and servers. grpc.EnableTracing = true logger.Infof("Starting %s", version.GetInfo()) //startup aclmgmt with default ACL providers (resource based and default 1.0 policies based). //Users can pass in their own ACLProvider to RegisterACLProvider (currently unit tests do this) // 建立ACL提供者 ACL访问控制列表 aclProvider := aclmgmt.NewACLProvider( aclmgmt.ResourceGetter(peer.GetStableChannelConfig), ) // 平台注册 pr := platforms.NewRegistry( &golang.Platform{}, &node.Platform{}, &java.Platform{}, &car.Platform{}, ) // 定义部署链码提供者 deployedCCInfoProvider := &lscc.DeployedCCInfoProvider{}
DeployedCCInfoProvider实现了DeployedChaincodeInfoProvider。golang
DeployedChaincodeInfoProvider是ledger用于构建集合配置历史记录的依赖项
LSCC模块应该为这个依赖项提供实现docker
type DeployedChaincodeInfoProvider interface { Namespaces() []string //命名空间 UpdatedChaincodes(stateUpdates map[string][]*kvrwset.KVWrite) ([]*ChaincodeLifecycleInfo, error) // 保存更新的链码 ChaincodeInfo(chaincodeName string, qe SimpleQueryExecutor) (*DeployedChaincodeInfo, error) // 保存链码信息 CollectionInfo(chaincodeName, collectionName string, qe SimpleQueryExecutor) (*common.StaticCollectionConfig, error) // 链码集合信息 }
初始化帐本资源ledgermgmt.Initializeshell
// 获取通道MSP管理员。若是不存在则建立 identityDeserializerFactory := func(chainID string) msp.IdentityDeserializer { return mgmt.GetManagerForChain(chainID) } // peer 初始化 // 保存 peer 一些基本信息 ListenAddress TLS opsSystem := newOperationsSystem() // 监听 ListenAddress err := opsSystem.Start() if err != nil { return errors.WithMessage(err, "failed to initialize operations subystems") } defer opsSystem.Stop() metricsProvider := opsSystem.Provider logObserver := floggingmetrics.NewObserver(metricsProvider) flogging.Global.SetObserver(logObserver) // 实例化私密数据成员membershipInfoProvider 用来判断peer是否在某个私密数据的集合中 membershipInfoProvider := privdata.NewMembershipInfoProvider(createSelfSignedData(), identityDeserializerFactory) //initialize resource management exit // 初始化帐本资源 将前面实例化的对象都进行赋值 ledgermgmt.Initialize( &ledgermgmt.Initializer{ CustomTxProcessors: peer.ConfigTxProcessors, PlatformRegistry: pr, DeployedChaincodeInfoProvider: deployedCCInfoProvider, MembershipInfoProvider: membershipInfoProvider, MetricsProvider: metricsProvider, HealthCheckRegistry: opsSystem, }, )
初始化peer GRPC服务配置缓存
// Parameter overrides must be processed before any parameters are // cached. Failures to cache cause the server to terminate immediately. // 判断链码是否时开发者模式 if chaincodeDevMode { logger.Info("Running in chaincode development mode") logger.Info("Disable loading validity system chaincode") viper.Set("chaincode.mode", chaincode.DevModeUserRunsChaincode) } // 缓存peer地址getLocalAddress address:port if err := peer.CacheConfiguration(); err != nil { return err } // 获取peer endpoint,没有则调用CacheConfiguration接口 peerEndpoint, err := peer.GetPeerEndpoint() if err != nil { err = fmt.Errorf("Failed to get Peer Endpoint: %s", err) return err } // 获取peer Host peerHost, _, err := net.SplitHostPort(peerEndpoint.Address) if err != nil { return fmt.Errorf("peer address is not in the format of host:port: %v", err) } listenAddr := viper.GetString("peer.listenAddress") // 获取peer grpc相关配置 主要是TLS设置和心跳设置 serverConfig, err := peer.GetServerConfig() if err != nil { logger.Fatalf("Error loading secure config for peer (%s)", err) } // 设置GRPC最大并发2500 throttle := comm.NewThrottle(grpcMaxConcurrency) // GRPC server的一些配置 serverConfig.Logger = flogging.MustGetLogger("core.comm").With("server", "PeerServer") serverConfig.MetricsProvider = metricsProvider serverConfig.UnaryInterceptors = append( serverConfig.UnaryInterceptors, grpcmetrics.UnaryServerInterceptor(grpcmetrics.NewUnaryMetrics(metricsProvider)), grpclogging.UnaryServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()), throttle.UnaryServerIntercptor, ) serverConfig.StreamInterceptors = append( serverConfig.StreamInterceptors, grpcmetrics.StreamServerInterceptor(grpcmetrics.NewStreamMetrics(metricsProvider)), grpclogging.StreamServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()), throttle.StreamServerInterceptor, )
将GRPC相关配置及Address传入建立GRPC服务器服务器
peerServer, err := peer.NewPeerServer(listenAddr, serverConfig) if err != nil { logger.Fatalf("Failed to create peer server (%s)", err) }
TLS及策略相关
// TLS相关配置 if serverConfig.SecOpts.UseTLS { logger.Info("Starting peer with TLS enabled") // set up credential support cs := comm.GetCredentialSupport() roots, err := peer.GetServerRootCAs() if err != nil { logger.Fatalf("Failed to set TLS server root CAs: %s", err) } cs.ServerRootCAs = roots // set the cert to use if client auth is requested by remote endpoints clientCert, err := peer.GetClientCertificate() if err != nil { logger.Fatalf("Failed to set TLS client certificate: %s", err) } comm.GetCredentialSupport().SetClientCertificate(clientCert) } mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert // 策略校验 policyCheckerProvider := func(resourceName string) deliver.PolicyCheckerFunc { return func(env *cb.Envelope, channelID string) error { return aclProvider.CheckACL(resourceName, channelID, env) } }
建立deliver server 传输区块及过滤区块
abServer := peer.NewDeliverEventsServer(mutualTLS, policyCheckerProvider, &peer.DeliverChainManager{}, metricsProvider) pb.RegisterDeliverServer(peerServer.Server(), abServer)
初始化链码服务
startChaincodeServer将完成与链代码相关的初始化,包括:
1)设置本地链代码安装路径
2)建立特定链代码的CA
3)启动特定链代码的gRPC监听服务
// Initialize chaincode service chaincodeSupport, ccp, sccp, packageProvider := startChaincodeServer(peerHost, aclProvider, pr, opsSystem) logger.Debugf("Running peer")
注册背书服务,gossip组件初始化等操做
// Start the Admin server startAdminServer(listenAddr, peerServer.Server(), metricsProvider) privDataDist := func(channel string, txID string, privateData *transientstore.TxPvtReadWriteSetWithConfigInfo, blkHt uint64) error { // 分发私有数据到其余节点 return service.GetGossipService().DistributePrivateData(channel, txID, privateData, blkHt) } // 获取本地签名 signingIdentity := mgmt.GetLocalSigningIdentityOrPanic() serializedIdentity, err := signingIdentity.Serialize() if err != nil { logger.Panicf("Failed serializing self identity: %v", err) } libConf := library.Config{} if err = viperutil.EnhancedExactUnmarshalKey("peer.handlers", &libConf); err != nil { return errors.WithMessage(err, "could not load YAML config") } reg := library.InitRegistry(libConf) // 背书 验证相关配置 authFilters := reg.Lookup(library.Auth).([]authHandler.Filter) endorserSupport := &endorser.SupportImpl{ SignerSupport: signingIdentity, Peer: peer.Default, PeerSupport: peer.DefaultSupport, ChaincodeSupport: chaincodeSupport, SysCCProvider: sccp, ACLProvider: aclProvider, } endorsementPluginsByName := reg.Lookup(library.Endorsement).(map[string]endorsement2.PluginFactory) validationPluginsByName := reg.Lookup(library.Validation).(map[string]validation.PluginFactory) signingIdentityFetcher := (endorsement3.SigningIdentityFetcher)(endorserSupport) channelStateRetriever := endorser.ChannelStateRetriever(endorserSupport) pluginMapper := endorser.MapBasedPluginMapper(endorsementPluginsByName) pluginEndorser := endorser.NewPluginEndorser(&endorser.PluginSupport{ ChannelStateRetriever: channelStateRetriever, TransientStoreRetriever: peer.TransientStoreFactory, PluginMapper: pluginMapper, SigningIdentityFetcher: signingIdentityFetcher, }) endorserSupport.PluginEndorser = pluginEndorser serverEndorser := endorser.NewEndorserServer(privDataDist, endorserSupport, pr, metricsProvider) auth := authHandler.ChainFilters(serverEndorser, authFilters...) // Register the Endorser server pb.RegisterEndorserServer(peerServer.Server(), auth) policyMgr := peer.NewChannelPolicyManagerGetter() // Initialize gossip component err = initGossipService(policyMgr, metricsProvider, peerServer, serializedIdentity, peerEndpoint.Address) if err != nil { return err } defer service.GetGossipService().Stop() // register prover grpc service // FAB-12971 disable prover service before v1.4 cut. Will uncomment after v1.4 cut // err = registerProverService(peerServer, aclProvider, signingIdentity) // if err != nil { // return err // }
初始化系统链码。
// initialize system chaincodes // deploy system chaincodes // 部署系统链码 sccp.DeploySysCCs("", ccp) logger.Infof("Deployed system chaincodes") // 查看已经安装等链码 installedCCs := func() ([]ccdef.InstalledChaincode, error) { return packageProvider.ListInstalledChaincodes() } // 建立链码的生命周期 lifecycle, err := cc.NewLifeCycle(cc.Enumerate(installedCCs)) if err != nil { logger.Panicf("Failed creating lifecycle: +%v", err) } // HandleMetadataUpdate在链代码生命周期更改发生变化时触发 onUpdate := cc.HandleMetadataUpdate(func(channel string, chaincodes ccdef.MetadataSet) { // 更新链码 service.GetGossipService().UpdateChaincodes(chaincodes.AsChaincodes(), gossipcommon.ChainID(channel)) }) // 监听器 监听链码更新 lifecycle.AddListener(onUpdate)
通道相关配置
// this brings up all the channels peer.Initialize(func(cid string) { logger.Debugf("Deploying system CC, for channel <%s>", cid) sccp.DeploySysCCs(cid, ccp) sub, err := lifecycle.NewChannelSubscription(cid, cc.QueryCreatorFunc(func() (cc.Query, error) { // 返回通道的查询器 return peer.GetLedger(cid).NewQueryExecutor() })) if err != nil { logger.Panicf("Failed subscribing to chaincode lifecycle updates") } // 注册该通道ChaincodeLifecycleEventListener cceventmgmt.GetMgr().Register(cid, sub) }, ccp, sccp, txvalidator.MapBasedPluginMapper(validationPluginsByName), pr, deployedCCInfoProvider, membershipInfoProvider, metricsProvider) // 获取peer一些配置 if viper.GetBool("peer.discovery.enabled") { registerDiscoveryService(peerServer, policyMgr, lifecycle) } networkID := viper.GetString("peer.networkId") logger.Infof("Starting peer with ID=[%s], network ID=[%s], address=[%s]", peerEndpoint.Id, networkID, peerEndpoint.Address) // Get configuration before starting go routines to avoid // racing in tests profileEnabled := viper.GetBool("peer.profile.enabled") profileListenAddress := viper.GetString("peer.profile.listenAddress") // Start the grpc server. Done in a goroutine so we can deploy the // genesis block if needed. serve := make(chan error) // 开启peer grpc服务 go func() { var grpcErr error if grpcErr = peerServer.Start(); grpcErr != nil { grpcErr = fmt.Errorf("grpc server exited with error: %s", grpcErr) } else { logger.Info("peer server exited") } serve <- grpcErr }() // Start profiling http endpoint if enabled if profileEnabled { go func() { logger.Infof("Starting profiling server with listenAddress = %s", profileListenAddress) if profileErr := http.ListenAndServe(profileListenAddress, nil); profileErr != nil { logger.Errorf("Error starting profiler: %s", profileErr) } }() } go handleSignals(addPlatformSignals(map[os.Signal]func(){ syscall.SIGINT: func() { serve <- nil }, syscall.SIGTERM: func() { serve <- nil }, })) // peer启动区块归档任务 if ledgerconfig.IsDataDumpEnabled() { logger.Debugf("DataDump:{DumpDir:%s,LoadDir:%s,MaxFileLimit:%d,DumpCron:%v,DumpInterval:%d,LoadRetryTimes:%d}", ledgerconfig.GetDataDumpPath(), ledgerconfig.GetDataLoadPath(), ledgerconfig.GetDataDumpFileLimit(), ledgerconfig.GetDataDumpCron(), ledgerconfig.GetDataDumpInterval(), ledgerconfig.GetDataLoadRetryTimes()) go func() { cronList := ledgerconfig.GetDataDumpCron() if cronList != nil && len(cronList) > 0 { cronTask := cron.New() cronTask.Start() for _, crontab := range cronList { logger.Debugf("Crontab addFunc for %s", crontab) err := cronTask.AddFunc(crontab, func() { chainInfoArray := peer.GetChannelsInfo() for _, chainInfo := range chainInfoArray { chainId := chainInfo.ChannelId l := peer.GetLedger(chainId) if err := l.DataDump(datadump.DumpForCronTab); err != nil { logger.Errorf("Failed to datadump for [%s]", err) } } }) if err != nil { logger.Errorf("Failed to add crontab task for %s", err) } } } }() } logger.Infof("Started peer with ID=[%s], network ID=[%s], address=[%s]", peerEndpoint.Id, networkID, peerEndpoint.Address) if viper.GetBool("peer.enBlkrouter") { go func() { startBlockServer() }() } // Block until grpc server exits // 阻塞 直到grpc服务退出 return <-serve }
到这里Peer
节点已经启动完成了,过程仍是很复杂的,这里总结一下总体的过程:
首先就是读取配置信息,建立Cache结构,以及检测其余Peer节点的信息。
CacheConfiguration()
,主要保存其余Peer
节点的相关信息。
建立PeerServer
。
peerServer, err := peer.NewPeerServer(listenAddr, serverConfig)
建立DeliverEventsServer
abServer := peer.NewDeliverEventsServer(mutualTLS, policyCheckerProvider, &peer.DeliverChainManager{}, metricsProvider)
pb.RegisterDeliverServer(peerServer.Server(), abServer)
fabric/core/peer/deliverevents.go
,该服务主要用于区块的交付与过滤,主要方法:Deliver(),DeliverFiltered()启动ChaincodeServer
chaincodeSupport, ccp, sccp, packageProvider := startChaincodeServer(peerHost, aclProvider, pr, opsSystem)
core/chaincode/chaincode_support.go
,返回了ChaincodeSupport:为Peer提供执行链码的接口,主要功能有Launch():启动一个中止运行的链码,Stop():中止链码的运行,HandleChaincodeStream():处理链码流信息,Register():将链码注册到当前Peer节点 ,createCCMessage():建立一个交易,ExecuteLegacyInit():链码的实例化,Execute():执行链码并返回回原始的响应,processChaincodeExecutionResult():处理链码的执行结果,InvokeInit():调用链码的Init方法,Invoke():调用链码,execute():执行一个交易
启动AdminServer
startAdminServer(listenAddr, peerServer.Server(), metricsProvider)
core/protos/peer/admin.go
文件,具备GetStatus(),StartServer(),GetModuleLogLevel(),SetModuleLogLevel()
等方法建立EndorserServer
pb.RegisterEndorserServer(peerServer.Server(), auth)
core/endorser/endorser.go
文件,注册背书服务器,提供了一个很重要的方法:ProcessProposal()
,这个方法值得看一下。建立GossipService
err = initGossipService(policyMgr, metricsProvider, peerServer, serializedIdentity, peerEndpoint.Address)
gossip/service/gossip_service.go
,具备InitializeChannel(),createSelfSignedData(),updateAnchors(),AddPayload()
等方法部署系统链码。
初始化通道。
启动gRPC服务。
若是启用了profile,还会启动监听服务。