咱们先假设初始时世界是混沌的尚未盘古的开天辟地,协调者也是一片荒芜人烟之地,没有保存任何状态,由于消费组的初始状态是Stable,在第一次的Rebalance时,正常的尚未向消费组注册过的消费者会执行状态为Stable
并且memberId=UNKNOWN_MEMBER_ID
条件分支。在第一次Rebalance以后,每一个消费者都分配到了一个成员编号,系统又会进入Stable稳定状态(Stable稳定状态包括两种:一种是没有任何消费者的稳定状态,一种是有消费者的稳定状态)。由于全部消费者在执行一次JoinGroup后并非说系统就一直保持这种不变的状态,有可能由于这样或那样的事件致使消费者要从新进行JoinGroup,这个时候由于以前JoinGroup过了每一个消费者都是有成员编号的,处理方式确定是不同的。算法
因此定义一种事件驱动的状态机就颇有必要了,这世界看起来是杂乱无章的,不过只要遵循着状态机的规则(万物生长的理论),任何事件都是有迹可循有路可走有条不紊地进行着。session
private def doJoinGroup(group: GroupMetadata,memberId: String,clientId: String, clientHost: String,sessionTimeoutMs: Int,protocolType: String, protocols: List[(String, Array[Byte])],responseCallback: JoinCallback) { if (group.protocolType!=protocolType||!group.supportsProtocols(protocols.map(_._1).toSet)) { //protocolType对于消费者是consumer,注意这里的协议类型和PartitionAssignor协议不一样哦 //协议类型目前总共就两种消费者和Worker,而协议是PartitionAssignor分配算法 responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL.code)) } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) { //若是当前组没有记录该消费者,而该消费者却被分配了成员编号,则重置为未知成员,并让消费者重试 responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code)) } else { group.currentState match { case Dead => responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code)) case PreparingRebalance => if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { //2.第二个消费者在这里了! addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback) } else { val member = group.get(memberId) updateMemberAndRebalance(group, member, protocols, responseCallback) } case Stable => if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { //1.初始时第一个消费者在这里! //若是消费者成员编号是未知的,则向GroupMetadata注册并被记录下来 addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, protocols, group, responseCallback) } else { //3.第二次Rebalance时第一个消费者在这里,此时要分Leader仍是普通的消费者了 val member = group.get(memberId) if (memberId == group.leaderId || !member.matches(protocols)) { updateMemberAndRebalance(group, member, protocols, responseCallback) } else { responseCallback(JoinGroupResult(members = Map.empty,memberId = memberId, generationId = group.generationId,subProtocol = group.protocol, leaderId = group.leaderId,errorCode = Errors.NONE.code)) } } } if (group.is(PreparingRebalance)) joinPurgatory.checkAndComplete(GroupKey(group.groupId)) } }
addMemberAndRebalance和updateMemberAndRebalance会建立或更新MemberMetadata,而且会尝试调用prepareRebalance
,消费组中只有一个消费者有机会调用prepareRebalance,而且一旦调用该方法,会将消费组状态更改成PreparingRebalance
,就会使得下一个消费者只能从case PreparingRebalance
入口进去了,假设第一个消费者是从Stable进入的,它更改了状态为PreparingRebalance,下一个消费者就不会从Stable进来的。不过进入Stable状态还要判断消费者是否是已经有了成员编号,一般是以前已经发生了Rebalance,这种影响也是比较巨大的,每一个消费者走的路径跟第一次的Rebalance是彻底不一样的迷宫地图了。scala
1)第一次Rebalance如图6-18的上半部分:code
图6-18 第一次和第二次Rebalance事件
2)第二次Rebalance,对于以前加入过的消费者都要成员编号如图6-18的下半部分:get
3)不过若是有消费者在Leader以前发送又有点不同了如图6-19:it
图6-19 Leader非第一个发送JoinGroup请求io
4)若是第一个消费者不是Leader,也没有编号,说明这是一个新增的消费者,流程又不一样了如图6-20:table
图6-20 新增消费组第一个发送JoinGroup请求class
根据上面的几种场景总结下来状态机的规则和一些结论以下: