看一下Peer
节点的启动过程,一般在Fabric网络中,Peer
节点的启动方式有两种,经过Docker容器启动,或者是经过执行命令直接启动。
通常状况下,咱们都是执行docker-compose -f docker-*.yaml up
命令经过容器启动了Peer
节点,而若是直接启动Peer
节点则是执行了peer node start
这条命令。看起来,这两种方式所使用的命令毫无关系,但事实上,在Docker容器中启动Peer
节点也是经过执行了peer node start
这条命令来启动Peer
节点,只不过是Docker替咱们执行了,这条命令就在以前经过启动Docker容器的那个文件中写到。因此说,不管是哪一种方式启动Peer
节点,都是经过peer node start
这条命令,接下来,咱们就分析一下执行完这条命令后,Peer
节点的启动过程。
和以前同样,首先找到切入点,在/fabric/peer/main.go
文件中,第46行:java
mainCmd.AddCommand(node.Cmd())
这里包含了与对Peer
节点进行相关操做的命令集合,其中就有启动Peer
节点的命令,咱们点进行看一下:node
func Cmd() *cobra.Command { nodeCmd.AddCommand(startCmd()) nodeCmd.AddCommand(statusCmd()) return nodeCmd }
共有两条命令:启动Peer
节点,以及查看节点的状态,咱们看一下启动Peer
节点这条命令,首先调用了peer/node/start.go
文件中的startCmd()
,以后转到了nodeStartCmd
,以及serve(args)
这个方法。其中,serve(args)
这个方法就是本文要说明了主要方法,咱们就从这里开始分析,在peer/node/start.go
文件中第125行:git
func serve(args []string) error { #首先获取MSP的类型,msp指的是成员关系服务提供者,至关于许可证 mspType := mgmt.GetLocalMSP().GetType() #若是MSP的类型不是FABRIC,返回错误信息 if mspType != msp.FABRIC { panic("Unsupported msp type " + msp.ProviderTypeToString(mspType)) } ... #建立ACL提供者,access control list访问控制列表 aclProvider := aclmgmt.NewACLProvider( aclmgmt.ResourceGetter(peer.GetStableChannelConfig), ) #平台注册,可使用的语言类型,最后一个car不太理解,可能和官方的一个例子有关 pr := platforms.NewRegistry( &golang.Platform{}, &node.Platform{}, &java.Platform{}, &car.Platform{}, )
定义一个用于部署链码的Provider结构体:github
deployedCCInfoProvider := &lscc.DeployedCCInfoProvider{} ==========================DeployedCCInfoProvider========================== type DeployedChaincodeInfoProvider interface { Namespaces() []string #命名空间 UpdatedChaincodes(stateUpdates map[string][]*kvrwset.KVWrite) ([]*ChaincodeLifecycleInfo, error) #保存更新的链码 ChaincodeInfo(chaincodeName string, qe SimpleQueryExecutor) (*DeployedChaincodeInfo, error) #保存链码信息 CollectionInfo(chaincodeName, collectionName string, qe SimpleQueryExecutor) (*common.StaticCollectionConfig, error) } #保存链码数据信息 ==========================DeployedCCInfoProvider==========================
下面是对Peer节点的一些属性的设置了:golang
identityDeserializerFactory := func(chainID string) msp.IdentityDeserializer { #获取通道管理者 return mgmt.GetManagerForChain(chainID) } #至关于配置Peer节点的运行环境了,主要就是保存Peer节点的IP地址,端口,证书等相关基本信息 opsSystem := newOperationsSystem() err := opsSystem.Start() if err != nil { return errors.WithMessage(err, "failed to initialize operations subystems") } defer opsSystem.Stop() metricsProvider := opsSystem.Provider #建立观察者,对Peer节点进行记录 logObserver := floggingmetrics.NewObserver(metricsProvider) flogging.Global.SetObserver(logObserver) #建立成员关系信息Provider,简单来讲就是保存其余Peer节点的信息,以便通讯等等 membershipInfoProvider := privdata.NewMembershipInfoProvider(createSelfSignedData(), identityDeserializerFactory) #帐本管理器初始化,主要就是以前所定义的一些属性 ledgermgmt.Initialize( &ledgermgmt.Initializer{ #与Tx处理相关 CustomTxProcessors: peer.ConfigTxProcessors, #以前定义的所使用的语言 PlatformRegistry: pr, #与链码相关 DeployedChaincodeInfoProvider: deployedCCInfoProvider, #与Peer节点交互相关 MembershipInfoProvider: membershipInfoProvider, #这个不太清楚,与Peer节点的属性相关? MetricsProvider: metricsProvider, #健康检查 HealthCheckRegistry: opsSystem, }, ) #判断是否处于开发模式下 if chaincodeDevMode { logger.Info("Running in chaincode development mode") logger.Info("Disable loading validity system chaincode") viper.Set("chaincode.mode", chaincode.DevModeUserRunsChaincode) } #里面有两个方法,分别是获取本地地址与获取当前Peer节点实例地址,将地址进行缓存 if err := peer.CacheConfiguration(); err != nil { return err } #获取当前Peer节点实例地址,若是没有进行缓存,则会执行上一步的CacheConfiguration()方法 peerEndpoint, err := peer.GetPeerEndpoint() if err != nil { err = fmt.Errorf("Failed to get Peer Endpoint: %s", err) return err } #简单的字符串操做,获取Host peerHost, _, err := net.SplitHostPort(peerEndpoint.Address) if err != nil { return fmt.Errorf("peer address is not in the format of host:port: %v", err) } #获取监听地址,该属性在opsSystem中定义过 listenAddr := viper.GetString("peer.listenAddress") #返回当前Peer节点的gRPC服务器配置,该方法主要就是设置TLS与心跳信息,在/core/peer/config.go文件中第128行。 serverConfig, err := peer.GetServerConfig() if err != nil { logger.Fatalf("Error loading secure config for peer (%s)", err) } #设置gRPC最大并发 grpcMaxConcurrency=2500 throttle := comm.NewThrottle(grpcMaxConcurrency) #设置日志信息 serverConfig.Logger = flogging.MustGetLogger("core.comm").With("server", "PeerServer") serverConfig.MetricsProvider = metricsProvider #设置拦截器,再也不细说 serverConfig.UnaryInterceptors = append( serverConfig.UnaryInterceptors, grpcmetrics.UnaryServerInterceptor(grpcmetrics.NewUnaryMetrics(metricsProvider)), grpclogging.UnaryServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()), throttle.UnaryServerIntercptor, ) serverConfig.StreamInterceptors = append( serverConfig.StreamInterceptors, grpcmetrics.StreamServerInterceptor(grpcmetrics.NewStreamMetrics(metricsProvider)), grpclogging.StreamServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()), throttle.StreamServerInterceptor, )
到这里建立了Peer节点的gRPC服务器,将以前的监听地址与服务器配置传了进去:docker
peerServer, err := peer.NewPeerServer(listenAddr, serverConfig) if err != nil { logger.Fatalf("Failed to create peer server (%s)", err) }
关于权限的一些配置:json
#TLS的相关设置 if serverConfig.SecOpts.UseTLS { logger.Info("Starting peer with TLS enabled") // set up credential support cs := comm.GetCredentialSupport() roots, err := peer.GetServerRootCAs() if err != nil { logger.Fatalf("Failed to set TLS server root CAs: %s", err) } cs.ServerRootCAs = roots // set the cert to use if client auth is requested by remote endpoints clientCert, err := peer.GetClientCertificate() if err != nil { logger.Fatalf("Failed to set TLS client certificate: %s", err) } comm.GetCredentialSupport().SetClientCertificate(clientCert) } mutualTLS := serverConfig.SecOpts.UseTLS && serverConfig.SecOpts.RequireClientCert #策略检查Provider,看传入的参数就比较清楚了,Envelope,通道ID,环境变量 policyCheckerProvider := func(resourceName string) deliver.PolicyCheckerFunc { return func(env *cb.Envelope, channelID string) error { return aclProvider.CheckACL(resourceName, channelID, env) } }
建立了另外一个服务器,与上面的权限设置相关,用于交付与过滤区块的事件服务器:缓存
abServer := peer.NewDeliverEventsServer(mutualTLS, policyCheckerProvider, &peer.DeliverChainManager{}, metricsProvider) #将以前建立的gRPC服务器与用于交付与过滤区块的事件服务器注册到这里 pb.RegisterDeliverServer(peerServer.Server(), abServer)
接下来是与链码相关的操做:服务器
#启动与链码相关的服务器,看传入的值 Peer节点的主机名,访问控制列表Provider,pr是以前提到与语言相关的,以及以前的运行环境 #主要完成三个操做:1.设置本地链码安装路径,2.建立自签名CA,3,启动链码gRPC监听服务,该方法在本文件中第709行 chaincodeSupport, ccp, sccp, packageProvider := startChaincodeServer(peerHost, aclProvider, pr, opsSystem) logger.Debugf("Running peer") #启动管理员服务,这个不太懂干吗的 startAdminServer(listenAddr, peerServer.Server(), metricsProvider) privDataDist := func(channel string, txID string, privateData *transientstore.TxPvtReadWriteSetWithConfigInfo, blkHt uint64) error { #看这个方法是分发私有数据到其余节点 return service.GetGossipService().DistributePrivateData(channel, txID, privateData, blkHt) } ========================TxPvtReadWriteSetWithConfigInfo========================== #看这里,主要是私有的读写集以及配置信息 type TxPvtReadWriteSetWithConfigInfo struct { EndorsedAt uint64 `protobuf:"varint,1,opt,name=endorsed_at,json=endorsedAt,proto3" json:"endorsed_at,omitempty"` PvtRwset *rwset.TxPvtReadWriteSet `protobuf:"bytes,2,opt,name=pvt_rwset,json=pvtRwset,proto3" json:"pvt_rwset,omitempty"` CollectionConfigs map[string]*common.CollectionConfigPackage `protobuf:"bytes,3,rep,name=collection_configs,json=collectionConfigs,proto3" json:"collection_configs,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } ============================TxPvtReadWriteSetWithConfigInfo========================== #获取本地的已签名的身份信息,主要是看当前节点具备的功能,好比背书,验证 signingIdentity := mgmt.GetLocalSigningIdentityOrPanic() serializedIdentity, err := signingIdentity.Serialize() if err != nil { logger.Panicf("Failed serializing self identity: %v", err) } # libConf := library.Config{} ================================Config============================= type Config struct { #权限过滤 AuthFilters []*HandlerConfig `mapstructure:"authFilters" yaml:"authFilters"` #这个不清楚 Decorators []*HandlerConfig `mapstructure:"decorators" yaml:"decorators"` #背书 Endorsers PluginMapping `mapstructure:"endorsers" yaml:"endorsers"` #验证 Validators PluginMapping `mapstructure:"validators" yaml:"validators"` } ==================================Config============================= if err = viperutil.EnhancedExactUnmarshalKey("peer.handlers", &libConf); err != nil { return errors.WithMessage(err, "could not load YAML config") } #建立一个Registry实例,将上面的配置注册到这里 reg := library.InitRegistry(libConf) #这一部分是背书操做的相关设置,不贴出来了 ... #设置完以后注册背书服务 pb.RegisterEndorserServer(peerServer.Server(), auth) #建立通道策略管理者,好比哪些节点或用户具备可读,可写,可操做的权限,都是由它管理 policyMgr := peer.NewChannelPolicyManagerGetter() #建立用于广播的服务,就是区块链中用于向其余节点发送消息的服务 err = initGossipService(policyMgr, metricsProvider, peerServer, serializedIdentity, peerEndpoint.Address)
到这里,链码的相关配置已经差很少了,到了部署系统链码的地方了:网络
#这一行代码就是将系统链码部署上去 sccp.DeploySysCCs("", ccp) logger.Infof("Deployed system chaincodes") installedCCs := func() ([]ccdef.InstalledChaincode, error) { #查看已经安装的链码 return packageProvider.ListInstalledChaincodes() } #与链码的生命周期相关 lifecycle, err := cc.NewLifeCycle(cc.Enumerate(installedCCs)) if err != nil { logger.Panicf("Failed creating lifecycle: +%v", err) } #处理链码的元数据更新,由其余节点广播 onUpdate := cc.HandleMetadataUpdate(func(channel string, chaincodes ccdef.MetadataSet) { service.GetGossipService().UpdateChaincodes(chaincodes.AsChaincodes(), gossipcommon.ChainID(channel)) }) #添加监听器监听链码元数据更新 lifecycle.AddListener(onUpdate)
这一部分是与通道的初始化相关的内容:
peer.Initialize(func(cid string) { logger.Debugf("Deploying system CC, for channel <%s>", cid) sccp.DeploySysCCs(cid, ccp) #获取通道的描述信息,就是通道的基本属性 sub, err := lifecycle.NewChannelSubscription(cid, cc.QueryCreatorFunc(func() (cc.Query, error) { #根据通道ID获取帐本的查询执行器 return peer.GetLedger(cid).NewQueryExecutor() })) if err != nil { logger.Panicf("Failed subscribing to chaincode lifecycle updates") } #为通道注册监听器 cceventmgmt.GetMgr().Register(cid, sub) }, ccp, sccp, txvalidator.MapBasedPluginMapper(validationPluginsByName), pr, deployedCCInfoProvider, membershipInfoProvider, metricsProvider) #当前节点状态改变后是否能够被发现 if viper.GetBool("peer.discovery.enabled") { registerDiscoveryService(peerServer, policyMgr, lifecycle) } #获取Peer节点加入的网络ID networkID := viper.GetString("peer.networkId") logger.Infof("Starting peer with ID=[%s], network ID=[%s], address=[%s]", peerEndpoint.Id, networkID, peerEndpoint.Address) #查看是否已经定义了配置文件 profileEnabled := viper.GetBool("peer.profile.enabled") profileListenAddress := viper.GetString("peer.profile.listenAddress") #建立进程启动gRPC服务器 serve := make(chan error) go func() { var grpcErr error if grpcErr = peerServer.Start(); grpcErr != nil { grpcErr = fmt.Errorf("grpc server exited with error: %s", grpcErr) } else { logger.Info("peer server exited") } serve <- grpcErr }() #若是已经定义了配置文件,则启动监听服务 if profileEnabled { go func() { logger.Infof("Starting profiling server with listenAddress = %s", profileListenAddress) if profileErr := http.ListenAndServe(profileListenAddress, nil); profileErr != nil { logger.Errorf("Error starting profiler: %s", profileErr) } }() } #开始处理接收到的消息了 go handleSignals(addPlatformSignals(map[os.Signal]func(){ syscall.SIGINT: func() { serve <- nil }, syscall.SIGTERM: func() { serve <- nil }, })) logger.Infof("Started peer with ID=[%s], network ID=[%s], address=[%s]", peerEndpoint.Id, networkID, peerEndpoint.Address) #阻塞在这里,除非gRPC服务中止 return <-serve }
到这里Peer
节点已经启动完成了,过程仍是很复杂的,这里总结一下总体的过程:
Peer
节点的信息。
CacheConfiguration()
,主要保存其余Peer
节点的相关信息。PeerServer
。
peerServer, err := peer.NewPeerServer(listenAddr, serverConfig)
DeliverEventsServer
。
abServer := peer.NewDeliverEventsServer(mutualTLS, policyCheckerProvider, &peer.DeliverChainManager{}, metricsProvider)
pb.RegisterDeliverServer(peerServer.Server(), abServer)
fabric/core/peer/deliverevents.go
,该服务主要用于区块的交付与过滤,主要方法:Deliver(),DeliverFiltered()ChaincodeServer
。
chaincodeSupport, ccp, sccp, packageProvider := startChaincodeServer(peerHost, aclProvider, pr, opsSystem)
core/chaincode/chaincode_support.go
,返回了ChaincodeSupport:为Peer提供执行链码的接口,主要功能有Launch():启动一个中止运行的链码,Stop():中止链码的运行,HandleChaincodeStream():处理链码流信息,Register():将链码注册到当前Peer节点 ,createCCMessage():建立一个交易,ExecuteLegacyInit():链码的实例化,Execute():执行链码并返回回原始的响应,processChaincodeExecutionResult():处理链码的执行结果,InvokeInit():调用链码的Init方法,Invoke():调用链码,execute():执行一个交易
AdminServer
。
startAdminServer(listenAddr, peerServer.Server(), metricsProvider)
core/protos/peer/admin.go
文件,具备GetStatus(),StartServer(),GetModuleLogLevel(),SetModuleLogLevel()
等方法EndorserServer
。
pb.RegisterEndorserServer(peerServer.Server(), auth)
core/endorser/endorser.go
文件,注册背书服务器,提供了一个很重要的方法:ProcessProposal()
,这个方法值得看一下。GossipService
。
err = initGossipService(policyMgr, metricsProvider, peerServer, serializedIdentity, peerEndpoint.Address)
gossip/service/gossip_service.go
,具备InitializeChannel(),createSelfSignedData(),updateAnchors(),AddPayload()
等方法流程图:,因为Fabric在不断更新,因此代码和图中仍是有一些不一样的。
参考:这里