之前历来没有写过博客,从这段时间开始才开始写一些本身的博客,以前总以为写一篇博客要耗费大量的时间,并且写的仍是本身已经学会的,以为没什么必要。可是当开始用博客记录下来的时候,才发现有些学会的地方只是本身以为已经学会了,仍是有太多地方须要学习,眼高手低了,因此之后会养成写博客的好习惯,保持记录。
今天记录一下以前阅读过的源码:Peer节点背书提案过程。git
首先定位到core/endorser/endorser.go
这个文件中的ProcessProposal()
方法在第450行。其实对于Peer节点背书提案的起点,并非从源码中找到的,参考了这里,有兴趣的能够看一下,接下来就从ProcessProposal()
这里开始分析:github
#该方法须要传入的参数有context(我理解为提案的上下文),以及已经签名的Proposal func (e *Endorser) ProcessProposal(ctx context.Context, signedProp *pb.SignedProposal) (*pb.ProposalResponse, error) { #首先获取Peer节点处理提案开始的时间 startTime := time.Now() #Peer节点接收到的提案数+1 e.Metrics.ProposalsReceived.Add(1) #从上下文中获取发起提案的地址 addr := util.ExtractRemoteAddress(ctx) //日志输出 endorserLogger.Debug("Entering: request from", addr) #这个不是链码ID,是通道ID var chainID string var hdrExt *pb.ChaincodeHeaderExtension var success bool #这个会在方法结束的时候调用 defer func() { #判断chaincodeHeaderExtension是否为空,若是为空的话提案验证失败 if hdrExt != nil { meterLabels := []string{ "channel", chainID, "chaincode", hdrExt.ChaincodeId.Name + ":" + hdrExt.ChaincodeId.Version, "success", strconv.FormatBool(success), } e.Metrics.ProposalDuration.With(meterLabels...).Observe(time.Since(startTime).Seconds()) } endorserLogger.Debug("Exit: request from", addr) }() #到了第一个重要的方法,对已签名的提案进行预处理,点进行看一下 vr, err := e.preProcess(signedProp)
preProcess()
preProcess()
这个方法在文件中的第366行:json
func (e *Endorser) preProcess(signedProp *pb.SignedProposal) (*validateResult, error) { #定义一个验证结果结构体 vr := &validateResult{} #首先对MSG进行验证是否有效,看一下这个方法 prop, hdr, hdrExt, err := validation.ValidateProposalMessage(signedProp) if err != nil { #若是报错的话,ProposalVaildationFailed+1 e.Metrics.ProposalValidationFailed.Add(1) #返回500 vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}} return vr, err }
ValidateProposalMessage()
在core/common/validation/msgvalidation.go
文件中,第75行,看一下这个方法主要就是对消息进行验证。数组
#把主要的代码列举一下 #从提案中获取Proposal内容 ... prop, err := utils.GetProposal(signedProp.ProposalBytes) ... #从Proposal中获取Header hdr, err := utils.GetHeader(prop.Header) #对Header进行验证 chdr, shdr, err := validateCommonHeader(hdr)
这里的Proposal以及Header结构体:数据结构
Proposal: type Proposal struct { #关键的是前两个 提案的Header与提案的有效载荷 Header []byte `protobuf:"bytes,1,opt,name=header,proto3" json:"header,omitempty"` Payload []byte `protobuf:"bytes,2,opt,name=payload,proto3" json:"payload,omitempty"` Extension []byte `protobuf:"bytes,3,opt,name=extension,proto3" json:"extension,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` } Header: type Header struct { #在提案的Header中又包含通道的Header与签名域的Header ChannelHeader []byte `protobuf:"bytes,1,opt,name=channel_header,json=channelHeader,proto3" json:"channel_header,omitempty"` SignatureHeader []byte `protobuf:"bytes,2,opt,name=signature_header,json=signatureHeader,proto3" json:"signature_header,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
看一下具体的验证方法,在第246行,依旧只列出主流程代码:多线程
#从提案的Header中获取通道Header信息 chdr, err := utils.UnmarshalChannelHeader(hdr.ChannelHeader)
通道Header的结构体定义在protos/common/common.pb.go
文件中第320行:app
type ChannelHeader struct { #类型 Type int32 `protobuf:"varint,1,opt,name=type,proto3" json:"type,omitempty"` #版本 Version int32 `protobuf:"varint,2,opt,name=version,proto3" json:"version,omitempty"` #时间戳 Timestamp *timestamp.Timestamp `protobuf:"bytes,3,opt,name=timestamp,proto3" json:"timestamp,omitempty"` #通道ID ChannelId string `protobuf:"bytes,4,opt,name=channel_id,json=channelId,proto3" json:"channel_id,omitempty"` #交易ID TxId string `protobuf:"bytes,5,opt,name=tx_id,json=txId,proto3" json:"tx_id,omitempty"` #该Header产生的时间 Epoch uint64 `protobuf:"varint,6,opt,name=epoch,proto3" json:"epoch,omitempty"` #额外的信息 Extension []byte `protobuf:"bytes,7,opt,name=extension,proto3" json:"extension,omitempty"` TlsCertHash []byte `protobuf:"bytes,8,opt,name=tls_cert_hash,json=tlsCertHash,proto3" json:"tls_cert_hash,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
仍是validateCommonHeader()
这个方法:ide
#获取签名域的Header shdr, err := utils.GetSignatureHeader(hdr.SignatureHeader)
SignatureHeader定义在protos/common/common.pb.go
文件中第434行:学习
type SignatureHeader struct { #消息的建立者 Creator []byte `protobuf:"bytes,1,opt,name=creator,proto3" json:"creator,omitempty"` #这个是为了防止重复攻击,具备惟一性 Nonce []byte `protobuf:"bytes,2,opt,name=nonce,proto3" json:"nonce,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
获取到channelHeader
与SingatureHeader
以后,能够对它们进行验证操做了:ui
#验证channelHeader err = validateChannelHeader(chdr)
该方法在core/common/validation/msgvalidation.go
文件中第214行:
#首先检查channelHeader是否为空 if cHdr == nil { return errors.New("nil ChannelHeader provided") } ... #而后对HeaderType进行检查,只有HeaderType是ENDORSER_TRANSACTION、CONFIG_UPDATE、CONFIG、TOKEN_TRANSACTION中其中一种才是有效的Header if common.HeaderType(cHdr.Type) != common.HeaderType_ENDORSER_TRANSACTION && common.HeaderType(cHdr.Type) != common.HeaderType_CONFIG_UPDATE && common.HeaderType(cHdr.Type) != common.HeaderType_CONFIG && common.HeaderType(cHdr.Type) != common.HeaderType_TOKEN_TRANSACTION { return errors.Errorf("invalid header type %s", common.HeaderType(cHdr.Type)) } ... #最后检查ChannelHeader中的Epoch是否为0 if cHdr.Epoch != 0 { return errors.Errorf("invalid Epoch in ChannelHeader. Expected 0, got [%d]", cHdr.Epoch) }
验证SignatureHeader
,该方法core/common/validation/msgvalidation.go
文件中194行:
#首先验证Header是否为空 if sHdr == nil { return errors.New("nil SignatureHeader provided") } #Nonce是否为空 if sHdr.Nonce == nil || len(sHdr.Nonce) == 0 { return errors.New("invalid nonce specified in the header") } #该Header建立者是否为空 if sHdr.Creator == nil || len(sHdr.Creator) == 0 { return errors.New("invalid creator specified in the header") }
因此对ChannelHeader
的检查主要是这三部分:
ChannelHeader
是否为空HeaderType
是不是ENDORSER_TRANSACTION、CONFIG_UPDATE、CONFIG、TOKEN_TRANSACTION
中其中一种Epoch
是否为空对SignatureHeader
的检查为:
SignatureHeader
是否为空Nonce
是否为空SignatureHeader
的建立者是否为空在对ChannelHeader
与SignatureHeader
的验证完成后,回到ValidateProposalMessage
方法:
#接下来是对Creator的Signature进行验证: err = checkSignatureFromCreator(shdr.Creator, signedProp.Signature, signedProp.ProposalBytes, chdr.ChannelId)
点进行,该方法在core/common/validation/msgvalidation.go
文件中第153行:
#首先检查是否有空参数 if creatorBytes == nil || sig == nil || msg == nil { return errors.New("nil arguments") } #根据通道Id获取Identity返回mspObj(member service providere)对象 mspObj := mspmgmt.GetIdentityDeserializer(ChainID) if mspObj == nil { return errors.Errorf("could not get msp for channel [%s]", ChainID) } #而后对Creator的identity进行查找 creator, err := mspObj.DeserializeIdentity(creatorBytes) if err != nil { return errors.WithMessage(err, "MSP error") } ... #对证书进行验证 err = creator.Validate() ... #对签名进行验证 err = creator.Verify(msg, sig)
最后看一下checkSignatureFromCreator
作了哪些工做:
Creator、Signature、ProposalBytes
是否有空参数ChannelId
获取Identity
Identity
查找Creator
的Identity
Creator
的证书与签名回到ValidateProposalMessage
方法,再向下看:
if err != nil { #当以前一步验证失败后进入这里。 #这一部分作了两件事 #1.将虚假的用户记录到Peer节点,防止该用户对通道进行扫描 putilsLogger.Warningf("channel [%s]: %s", chdr.ChannelId, err) sId := &msp.SerializedIdentity{} err := proto.Unmarshal(shdr.Creator, sId) if err != nil { err = errors.Wrap(err, "could not deserialize a SerializedIdentity") putilsLogger.Warningf("channel [%s]: %s", chdr.ChannelId, err) } #2.将错误信息返回,这一条信息应该见过好屡次 return nil, nil, nil, errors.Errorf("access denied: channel [%s] creator org [%s]", chdr.ChannelId, sId.Mspid) } #这一步用于检查TxId是否已经存在,防止重复攻击 err = utils.CheckTxID( chdr.TxId, shdr.Nonce, shdr.Creator) if err != nil { return nil, nil, nil, err } #方法的最后了,判断Header的类型 switch common.HeaderType(chdr.Type) { #从这里能够看到,不论Header类型为CONFIG,仍是ENDORSER_TRANSACTION都会进入下面的validateChaincodeProposalMessage方法,若是Header类型不是以上两种,返回不支持的proposal类型 case common.HeaderType_CONFIG: fallthrough case common.HeaderType_ENDORSER_TRANSACTION: chaincodeHdrExt, err := validateChaincodeProposalMessage(prop, hdr) if err != nil { return nil, nil, nil, err } return prop, hdr, chaincodeHdrExt, err default: return nil, nil, nil, errors.Errorf("unsupported proposal type %d", common.HeaderType(chdr.Type)) }
看一下validateChaincodeProposalMessage
方法,在core/common/validation/msgvalidation.go
中第36行:
#验证proposal header是否为空 if prop == nil || hdr == nil { return nil, errors.New("nil arguments") } ... #一些扩展信息,再也不解释 chaincodeHdrExt, err := utils.GetChaincodeHeaderExtension(hdr) if err != nil { return nil, errors.New("invalid header extension for type CHAINCODE") } #链码Id是否为空 if chaincodeHdrExt.ChaincodeId == nil { return nil, errors.New("ChaincodeHeaderExtension.ChaincodeId is nil") } ... #有效载荷是否为空 if chaincodeHdrExt.PayloadVisibility != nil { return nil, errors.New("invalid payload visibility field") }
若是没有问题的话ValidateProposalMessage()
方法就结束了,回到preProcess()
方法中接着往下:
... #获取通道头信息 chdr, err := putils.UnmarshalChannelHeader(hdr.ChannelHeader) ... #获取签名头信息 shdr, err := putils.GetSignatureHeader(hdr.SignatureHeader) ... 判断是否调用的是不可被外部调用的系统链码 if e.s.IsSysCCAndNotInvokableExternal(hdrExt.ChaincodeId.Name) { ... return vr, err } ... #判断通道Id是否为空 if chainID != "" { ... #通道ID不为空则查找该TxID是否已经存在 if _, err = e.s.GetTransactionByID(chainID, txid); err == nil { ... } #判断是否为系统链码 if !e.s.IsSysCC(hdrExt.ChaincodeId.Name) { #若是不是系统链码,则检查ACL(访问权限) if err = e.s.CheckACL(signedProp, chdr, shdr, hdrExt); err != nil { ... return vr, err } } }else{ #若是通道ID为空的话什么也不作 } vr.prop, vr.hdrExt, vr.chainID, vr.txid = prop, hdrExt, chainID, txid return vr, nil
总结一下preProcess()
方法所作的工做:
Proposal
与Header
。Header
中获取ChannelHeader
和SignatureHeader
。ChannelHeader
和SignatureHeader
进行验证。
ChannelHeader
:
ChannelHeader
是否为空。HeaderType
类型是否为ENDORSER_TRANSACTION、CONFIG_UPDATE、CONFIG、TOKEN_TRANSACTION
中其中一种。Epoch
是否为空。SignatureHeader
:
SignatureHeader
是否为空。Nonce
是否为空。SignatureHeader
的建立者是否为空。Creator、Signature、ProposalBytes
是否为空。Identity
。Identity
中查找Creator
的证书等信息。Creator
的证书和签名信息。ChannelHeader
,SignatureHeader
。到这里,预处理提案过程已经完成,回到ProcessProposal()
这个主方法,接着往下:
if err != nil { resp := vr.resp return resp, err } prop, hdrExt, chainID, txid := vr.prop, vr.hdrExt, vr.chainID, vr.txid #这里定义了一个Tx模拟器,用于后面的模拟交易过程,若是通道Id为空,那么TxSimulator也是空 var txsim ledger.TxSimulator #定义一个历史记录查询器 var historyQueryExecutor ledger.HistoryQueryExecutor #这里判断是否须要Tx模拟 if acquireTxSimulator(chainID, vr.hdrExt.ChaincodeId) { #若是须要进行模拟的话,根据通道ID获取Tx模拟器 if txsim, err = e.s.GetTxSimulator(chainID, txid); err != nil { return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil } #等待Tx模拟完成,最后执行 defer txsim.Done() #获取历史记录查询器 if historyQueryExecutor, err = e.s.GetHistoryQueryExecutor(chainID); err != nil { return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil } }
看一下acquireTxSimulator()
方法,怎么判断是否须要进行TX模拟的:
func acquireTxSimulator(chainID string, ccid *pb.ChaincodeID) bool { #若是通道ID为空,就说明不须要进行Tx的模拟 if chainID == "" { return false } #通道ID不为空,则判断链码的类型,若是是qscc(查询系统链码),cscc(配置系统链码),则不须要进行Tx模拟 switch ccid.Name { case "qscc", "cscc": return false default: return true } }
回到ProcessProposal()
方法中,接下来到了第二个重要的方法了:
#首先定义一个交易参数结构体,用于下面的方法,里面的字段以前都有说过,这里再也不解释 txParams := &ccprovider.TransactionParams{ ChannelID: chainID, TxID: txid, SignedProp: signedProp, Proposal: prop, TXSimulator: txsim, HistoryQueryExecutor: historyQueryExecutor, } #这一行代码就是对交易进行模拟,点进去看一下 cd, res, simulationResult, ccevent, err := e.SimulateProposal(txParams, hdrExt.ChaincodeId)
SimulateProposal()
该方法主要是Peer节点模拟提案过程,可是不会写入到区块中,当Peer节点模拟完一项提案,将模拟结果保存至读写集。看一下SimulateProposal()
中的具体执行流程,在core/endorser/endorser.go
文件中第216行:
func (e *Endorser) SimulateProposal(txParams *ccprovider.TransactionParams, cid *pb.ChaincodeID) (ccprovider.ChaincodeDefinition, *pb.Response, []byte, *pb.ChaincodeEvent, error) #该方法传入的参数有TransactionParams、ChaincodeID,返回的参数有ChaincodeDefinition,Response,ChaincodeEvent,error #TransactionParams以前有提到,ChaincodeID用于肯定所调用的链码,ChaincodeDefinition是链码标准数据结构,Response是链码的响应信息,以及链码事件. type ChaincodeDefinition interface { #链码名称 CCName() string #返回的链码的HASH值 Hash() []byte #链码的版本 CCVersion() string #返回的是验证链码上提案的方式,一般是vscc Validation() (string, []byte) #返回的是背书链码上提案的方式,一般是escc Endorsement() string }
看一下方法中的内容:
#首先获取链码调用的细节 cis, err := putils.GetChaincodeInvocationSpec(txParams.Proposal)
GetChaincodeInvocationSpec()
方法在protos/utils/proputils.go
文件中第25行:
func GetChaincodeInvocationSpec(prop *peer.Proposal) (*peer.ChaincodeInvocationSpec, error) { ... #仅仅调用了获取Header的方法,并无去获取Header,至关于对Header进行验证 _, err := GetHeader(prop.Header) if err != nil { return nil, err } #从链码提案中获取有效载荷 ccPropPayload, err := GetChaincodeProposalPayload(prop.Payload) if err != nil { return nil, err } #定义一个ChaincodeInvocationSpec结构,该结构体包含链码的功能与参数,在这里至关于将提案中所调用的链码功能与参数封装成一个ChaincodeInvocationSpec结构。 cis := &peer.ChaincodeInvocationSpec{} err = proto.Unmarshal(ccPropPayload.Input, cis) #最后将其返回 return cis, errors.Wrap(err, "error unmarshaling ChaincodeInvocationSpec") }
继续往下看,紧接着定义了一个ChaincodeDefinition
,和一个保存版本信息的字符串:
var cdLedger ccprovider.ChaincodeDefinition var version string
这里有一个分支,判断是不是调用的系统链码:
if !e.s.IsSysCC(cid.Name) { #若是不是系统链码,首先获取链码的标准数据结构 cdLedger, err = e.s.GetChaincodeDefinition(cid.Name, txParams.TXSimulator) if err != nil { return nil, nil, nil, nil, errors.WithMessage(err, fmt.Sprintf("make sure the chaincode %s has been successfully instantiated and try again", cid.Name)) } #获取用户链码版本 version = cdLedger.CCVersion() #检查链码实例化策略 err = e.s.CheckInstantiationPolicy(cid.Name, version, cdLedger) if err != nil { return nil, nil, nil, nil, err } } else { #若是调用的是系统链码,仅仅获取系统链码的版本 version = util.GetSysCCVersion() }
到这里,模拟提案的准备工做已经完成,还定义了一些字段:
#定义一个Tx模拟结果集 var simResult *ledger.TxSimulationResults #一个byte数组,保存public的模拟响应结果 var pubSimResBytes []byte #响应信息 var res *pb.Response #链码事件 var ccevent *pb.ChaincodeEvent type TxSimulationResults struct { #能够看到Tx模拟结果集里面保存公共的与私有的读写集 PubSimulationResults *rwset.TxReadWriteSet PvtSimulationResults *rwset.TxPvtReadWriteSet } #链码事件结构体 type ChaincodeEvent struct { #链码Id ChaincodeId string `protobuf:"bytes,1,opt,name=chaincode_id,json=chaincodeId,proto3" json:"chaincode_id,omitempty"` #交易Id TxId string `protobuf:"bytes,2,opt,name=tx_id,json=txId,proto3" json:"tx_id,omitempty"` #事件名称 EventName string `protobuf:"bytes,3,opt,name=event_name,json=eventName,proto3" json:"event_name,omitempty"` #有效载荷 Payload []byte `protobuf:"bytes,4,opt,name=payload,proto3" json:"payload,omitempty"` XXX_NoUnkeyedLiteral struct{} `json:"-"` XXX_unrecognized []byte `json:"-"` XXX_sizecache int32 `json:"-"` }
到这里,就开始执行链码进行模拟了:
res, ccevent, err = e.callChaincode(txParams, version, cis.ChaincodeSpec.Input, cid)
callChaincode()
又是一个重要的方法,调用具体的链码(包括系统链码与用户链码),进去看一下执行逻辑,该方法在第133行:
func (e *Endorser) callChaincode(txParams *ccprovider.TransactionParams, version string, input *pb.ChaincodeInput, cid *pb.ChaincodeID) (*pb.Response, *pb.ChaincodeEvent, error) { ... #看名字应该是记录链码执行时间的 defer func(start time.Time) { logger := endorserLogger.WithOptions(zap.AddCallerSkip(1)) elapsedMilliseconds := time.Since(start).Round(time.Millisecond) / time.Millisecond logger.Infof("[%s][%s] Exit chaincode: %s (%dms)", txParams.ChannelID, shorttxid(txParams.TxID), cid, elapsedMilliseconds) }(time.Now()) #定义了一些字段 var err error var res *pb.Response var ccevent *pb.ChaincodeEvent #执行链码,若是是用户链码具体怎么执行的要看用户写的链码逻辑,执行完毕后返回响应信息与链码事件 res, ccevent, err = e.s.Execute(txParams, txParams.ChannelID, cid.Name, version, txParams.TxID, txParams.SignedProp, txParams.Proposal, input) #这里说明一下,状态常量一共有三个:OK = 200 ERRORTHRESHOLD = 400 ERROR = 500 大于等于400就是错误信息或者被背书节点拒绝。 if res.Status >= shim.ERRORTHRESHOLD { return res, nil, nil }
再往下看,一个if语句,判断调用的链码是否为lscc,若是是lscc判断传入的参数是否大于等于3,而且调用的方法是否为deploy或者upgrade,若是是用户链码到这是方法就结束了。
if cid.Name == "lscc" && len(input.Args) >= 3 && (string(input.Args[0]) == "deploy" || string(input.Args[0]) == "upgrade") { #获取链码部署的基本结构,deploy与upgrade都须要对链码进行部署 userCDS, err := putils.GetChaincodeDeploymentSpec(input.Args[2], e.PlatformRegistry) ... #这一行代码没有搞清楚啥意思 cds, err = e.SanitizeUserCDS(userCDS) if err != nil { return nil, nil, err } ... #执行链码的Init,具体如何执行的这里就再也不看了,否则内容更多了 _, _, err = e.s.ExecuteLegacyInit(txParams, txParams.ChannelID, cds.ChaincodeSpec.ChaincodeId.Name, cds.ChaincodeSpec.ChaincodeId.Version, txParams.TxID, txParams.SignedProp, txParams.Proposal, cds) ... }
callChaincode()
方法到这里结束了链码的调用执行也完成了,返回响应消息与链码事件,回到SimulateProposal()
:
... 若是TXSimulator不为空,说明大部分是有帐本有关的操做 if txParams.TXSimulator != nil { #GetTxSimulationResults()获取Tx模拟结果集 if simResult, err = txParams.TXSimulator.GetTxSimulationResults(); err != nil { txParams.TXSimulator.Done() return nil, nil, nil, nil, err } #以前提到Tx模拟结果集中不只仅只有公共读写集,还有私有的读写集,接下来判断私有的读写集是否为空: if simResult.PvtSimulationResults != nil { #判断链码Id是否为lscc if cid.Name == "lscc" { 若是为生命周期系统链码,返回错误信息 txParams.TXSimulator.Done() #私有数据禁止用于实例化操做 return nil, nil, nil, nil, errors.New("Private data is forbidden to be used in instantiate") } #好像与配置有关,没有看明白 pvtDataWithConfig, err := e.AssemblePvtRWSet(simResult.PvtSimulationResults, txParams.TXSimulator) #读取配置信息须要在更新配置信息释放锁以前,等待执行完成 txParams.TXSimulator.Done() ... #获取帐本的高度 endorsedAt, err := e.s.GetLedgerHeight(txParams.ChannelID) pvtDataWithConfig.EndorsedAt = endorsedAt #应该是更新数据了,可能理解的不对 if err := e.distributePrivateData(txParams.ChannelID, txParams.TxID, pvtDataWithConfig, endorsedAt); err != nil { return nil, nil, nil, nil, err } } txParams.TXSimulator.Done() #获取公共模拟数据 if pubSimResBytes, err = simResult.GetPubSimulationBytes(); err != nil { return nil, nil, nil, nil, err } } #最后返回 return cdLedger, res, pubSimResBytes, ccevent, nil
到这里提案的模拟完成了,下一步就是背书过程了,感受整个流程仍是挺长的,先回到主方法,继续往下走:
cd, res, simulationResult, ccevent, err := e.SimulateProposal(txParams, hdrExt.ChaincodeId) if err != nil { return &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}, nil } //若是响应不为空 if res != nil { //若是状态大于等于ERROR,就是发生错误以后的逻辑,这里再也不说了 if res.Status >= shim.ERROR { ... return pResp, nil } } #定义一个提案响应字段 var pResp *pb.ProposalResponse if chainID == "" { #若是通道ID为空就直接返回了 pResp = &pb.ProposalResponse{Response: res} } else { #通道Id不为空,开始进行背书操做了,这是到了第三个重要的方法 pResp, err = e.endorseProposal(ctx, chainID, txid, signedProp, prop, res, simulationResult, ccevent, hdrExt.PayloadVisibility, hdrExt.ChaincodeId, txsim, cd) #先把下面的说无缺了,整个流程立刻就结束了 #背书完成后定义一个标签,保存通道与链码信息 meterLabels := []string{ "channel", chainID, "chaincode", hdrExt.ChaincodeId.Name + ":" + hdrExt.ChaincodeId.Version, } #简单来讲,这里就是发生ERROR以后的处理,再也不细看 if err != nil { ... } if pResp.Response.Status >= shim.ERRORTHRESHOLD { ... return pResp, nil } } pResp.Response = res #提案成功的数量+1 e.Metrics.SuccessfulProposals.Add(1) success = true #返回提案的响应信息 return pResp, nil } #到这里整个提案的处理流程就结束了,最后再看一下背书流程
endorseProposal()
该方法主要就是完成Peer节点对提案的背书操做,代码在309行:
func (e *Endorser) endorseProposal(_ context.Context, chainID string, txid string, signedProp *pb.SignedProposal, proposal *pb.Proposal, response *pb.Response, simRes []byte, event *pb.ChaincodeEvent, visibility []byte, ccid *pb.ChaincodeID, txsim ledger.TxSimulator, cd ccprovider.ChaincodeDefinition) (*pb.ProposalResponse, error)
传入的参数比较多,分析一下:
Context
这个参数从ProcessProposal()
主方法传入进来,应该是上下文的意思。chainID
:通道Idtxid
:交易IDSignedProposal
:签名过的提案proposal
:提案response
:以前返回的响应消息simRes
:模拟结果集event
:链码事件visibility
:这个还没搞清楚ccid
:链码Idtxsim
:交易模拟器ChaincodeDefinition
:链码标准数据结构,就是调用的链码功能和参数等信息... func (e *Endorser) endorseProposal(#后面参数省略)(*pb.ProposalResponse, error){ var escc string #判断是不是系统链码 if isSysCC { #若是是系统链码,则使用escc进行背书 escc = "escc" } else { #看官方解释这个好像也是返回escc escc = cd.Endorsement() } ... var err error var eventBytes []byte #若是链码事件不为空 if event != nil { #获取链码事件 eventBytes, err = putils.GetBytesChaincodeEvent(event) if err != nil { return nil, errors.Wrap(err, "failed to marshal event bytes") } } if isSysCC { #获取系统链码版本 ccid.Version = util.GetSysCCVersion() } else { #获取用户链码版本 ccid.Version = cd.CCVersion() } #以前一直没解释的上下文到这里就比较清楚了 ctx := Context{ PluginName: escc, Channel: chainID, SignedProposal: signedProp, ChaincodeID: ccid, Event: eventBytes, SimRes: simRes, Response: response, Visibility: visibility, Proposal: proposal, TxID: txid, } #这个就是背书了,看一下这个方法 return e.s.EndorseWithPlugin(ctx) }
这个方法在core/endorser/plugin_endorser.go
中第162行:
func (pe *PluginEndorser) EndorseWithPlugin(ctx Context) (*pb.ProposalResponse, error) { ... #Plugin是插件的意思,不知道在这里怎么解释更合理一些,建立或者获取插件? plugin, err := pe.getOrCreatePlugin(PluginName(ctx.PluginName), ctx.Channel) ... #从上下文中获取提案byte数据 prpBytes, err := proposalResponsePayloadFromContext(ctx) ... #背书操做,在core/endorser/mocks/plugin.go文件中,就是调用了Plugin中的背书方法,没啥解释的,方法在core/endorser/mocks/plugin.go中 endorsement, prpBytes, err := plugin.Endorse(prpBytes, ctx.SignedProposal) ... #背书完成后,封装为提案响应结构体,最后将该结构体返回 resp := &pb.ProposalResponse{ Version: 1, Endorsement: endorsement, Payload: prpBytes, Response: ctx.Response, } ... return resp, nil } #Plugin中共有两个方法 type Plugin interface { #背书 Endorse(payload []byte, sp *peer.SignedProposal) (*peer.Endorsement, []byte, error) #初始化 Init(dependencies ...Dependency) error }
上面的两个方法看一下:第一个getOrCreatePlugin()
在第202行:
#根据给予的插件名与通道返回一个插件实例 func (pe *PluginEndorser) getOrCreatePlugin(plugin PluginName, channel string) (endorsement.Plugin, error) { #获取插件工厂 pluginFactory := pe.PluginFactoryByName(plugin) if pluginFactory == nil { return nil, errors.Errorf("plugin with name %s wasn't found", plugin) } #这个就是获取或建立一个通道映射,意思就是若是有就直接获取,没有就先建立再获取。里面就再也不解释了,都是一些基本的操做。传入了插件的名称与插件工厂,返回了pluginsByChannel,结构体在下面 pluginsByChannel := pe.getOrCreatePluginChannelMapping(PluginName(plugin), pluginFactory) #根据通道建立插件,看一下这个方法 return pluginsByChannel.createPluginIfAbsent(channel) } type PluginName string #看结构体中内容 type pluginsByChannel struct { #读写锁 sync.RWMutex #插件工厂 pluginFactory endorsement.PluginFactory #map集合,包含全部的Plugin channels2Plugins map[string]endorsement.Plugin #背书插件 pe *PluginEndorser }
createPluginIfAbsent()
这个方法在第103行:
func (pbc *pluginsByChannel) createPluginIfAbsent(channel string) (endorsement.Plugin, error) { #首先就是获取一个读锁 pbc.RLock() #根据数组下标找须要的插件 plugin, exists := pbc.channels2Plugins[channel] #释放读锁 pbc.RUnlock() #若是找到的话直接返回 if exists { return plugin, nil } #到这里说明没有找到,代表插件不存在,此次获取锁,这是与上面的锁不一样 pbc.Lock() #表示最后才释放锁 defer pbc.Unlock() #再进行一次查找,多线程下说不定有其余线程刚刚建立了呢 plugin, exists = pbc.channels2Plugins[channel] #若是查找到的话释放锁后直接返回 if exists { return plugin, nil } #到这里说明真的没有该插件,使用插件工厂New一个 pluginInstance := pbc.pluginFactory.New() #进行初始化操做 plugin, err := pbc.initPlugin(pluginInstance, channel) if err != nil { return nil, err } #添加到数组里,下次再查找该插件的时候就存在了 pbc.channels2Plugins[channel] = plugin #最后释放锁后返回 return plugin, nil }
看一下initPlugin()
方法是怎么进行初始化的,在第127行:
func (pbc *pluginsByChannel) initPlugin(plugin endorsement.Plugin, channel string) (endorsement.Plugin, error) { var dependencies []endorsement.Dependency var err error if channel != "" { #根据给予的通道信息建立一个用于查询的Creator query, err := pbc.pe.NewQueryCreator(channel) ... #根据给予的通道信息获取状态数据,也就是当前帐本中最新状态 store := pbc.pe.TransientStoreRetriever.StoreForChannel(channel) ... #添加进数组中 dependencies = append(dependencies, &ChannelState{QueryCreator: query, Store: store}) } dependencies = append(dependencies, pbc.pe.SigningIdentityFetcher) #Plugin的初始化方法在这里被调用 err = plugin.Init(dependencies...) ... return plugin, nil }
Plugin
这里建立完后就开始进行背书操做了,背书完成后返回响应信息,整个流程就到这里结束了。
最后总结一下总体的流程好了:
preProcess()
Header
信息SimulateProposal()
callChaincode()
方法进行模拟。endorseProposal()
整个过程仍是比较长的,不过还算比较清晰,下一篇文章分析一下Peer节点的启动过程好了。