kafka协调者

咱们先假设初始时世界是混沌的尚未盘古的开天辟地,协调者也是一片荒芜人烟之地,没有保存任何状态,由于消费组的初始状态是Stable,在第一次的Rebalance时,正常的尚未向消费组注册过的消费者会执行状态为Stable并且memberId=UNKNOWN_MEMBER_ID条件分支。在第一次Rebalance以后,每一个消费者都分配到了一个成员编号,系统又会进入Stable稳定状态(Stable稳定状态包括两种:一种是没有任何消费者的稳定状态,一种是有消费者的稳定状态)。由于全部消费者在执行一次JoinGroup后并非说系统就一直保持这种不变的状态,有可能由于这样或那样的事件致使消费者要从新进行JoinGroup,这个时候由于以前JoinGroup过了每一个消费者都是有成员编号的,处理方式确定是不同的。算法

因此定义一种事件驱动的状态机就颇有必要了,这世界看起来是杂乱无章的,不过只要遵循着状态机的规则(万物生长的理论),任何事件都是有迹可循有路可走有条不紊地进行着。session

 

private def doJoinGroup(group: GroupMetadata,memberId: String,clientId: String,
    clientHost: String,sessionTimeoutMs: Int,protocolType: String,
    protocols: List[(String, Array[Byte])],responseCallback: JoinCallback) {
  if (group.protocolType!=protocolType||!group.supportsProtocols(protocols.map(_._1).toSet)) {
    //protocolType对于消费者是consumer,注意这里的协议类型和PartitionAssignor协议不一样哦
    //协议类型目前总共就两种消费者和Worker,而协议是PartitionAssignor分配算法
    responseCallback(joinError(memberId, Errors.INCONSISTENT_GROUP_PROTOCOL.code))
  } else if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID && !group.has(memberId)) {
    //若是当前组没有记录该消费者,而该消费者却被分配了成员编号,则重置为未知成员,并让消费者重试
    responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
  } else { group.currentState match {
    case Dead =>
      responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID.code))
    case PreparingRebalance =>
      if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) { //2.第二个消费者在这里了!
        addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, 
          protocols, group, responseCallback)
      } else {
        val member = group.get(memberId)
        updateMemberAndRebalance(group, member, protocols, responseCallback)
      }
    case Stable =>
      if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {  //1.初始时第一个消费者在这里!
        //若是消费者成员编号是未知的,则向GroupMetadata注册并被记录下来
        addMemberAndRebalance(sessionTimeoutMs, clientId, clientHost, 
          protocols, group, responseCallback)
      } else { //3.第二次Rebalance时第一个消费者在这里,此时要分Leader仍是普通的消费者了
        val member = group.get(memberId)
        if (memberId == group.leaderId || !member.matches(protocols)) {
          updateMemberAndRebalance(group, member, protocols, responseCallback)
        } else {
          responseCallback(JoinGroupResult(members = Map.empty,memberId = memberId,
            generationId = group.generationId,subProtocol = group.protocol,
            leaderId = group.leaderId,errorCode = Errors.NONE.code))
        }
      }
    }
    if (group.is(PreparingRebalance))
      joinPurgatory.checkAndComplete(GroupKey(group.groupId))
  }
}

addMemberAndRebalance和updateMemberAndRebalance会建立或更新MemberMetadata,而且会尝试调用prepareRebalance,消费组中只有一个消费者有机会调用prepareRebalance,而且一旦调用该方法,会将消费组状态更改成PreparingRebalance,就会使得下一个消费者只能从case PreparingRebalance入口进去了,假设第一个消费者是从Stable进入的,它更改了状态为PreparingRebalance,下一个消费者就不会从Stable进来的。不过进入Stable状态还要判断消费者是否是已经有了成员编号,一般是以前已经发生了Rebalance,这种影响也是比较巨大的,每一个消费者走的路径跟第一次的Rebalance是彻底不一样的迷宫地图了。scala

1)第一次Rebalance如图6-18的上半部分:code

  1. 第一个消费者,状态为Stable,没有编号,addMemberAndRebalance,成为Leader,执行prepareRebalance,更改状态为PreparingRebalance,建立DelayedJoin
  2. 第二个消费者,状态为PreparingRebalance,没有编号,addMemberAndRebalance(不执行prepareRebalance,由于在状态改变成PreparingRebalance后就不会被执行了);后面的消费者同第二个
  3. 全部消费者都要等协调者收集完全部成员编号在DelayedJoin完成时才会收到JoinGroup响应

 

图6-18 第一次和第二次Rebalance事件

2)第二次Rebalance,对于以前加入过的消费者都要成员编号如图6-18的下半部分:get

  1. 第一个消费者是Leader,状态为Stable,有编号,updateMemberAndRebalance,更改状态为PreparingRebalance,建立DelayedJoin
  2. 第二个消费者,状态为PreparingRebalance,有编号,updateMemberAndRebalance;后面的消费者同第二个
  3. 全部消费者也要等待,由于其余消费者发送Join请求在Leader消费者以后。

3)不过若是有消费者在Leader以前发送又有点不同了如图6-19:it

  1. 第一个消费者不是Leader,状态为Stable,有编号,responseCallback,当即收到JoinGroup响应,好幸运啊!
  2. 第二个消费者若是也不是Leader,恭喜你,协调者也放过他,直接返回JoinGroup响应
  3. 第三个消费者是Leader(领导来了),状态为Stable(什么,大家以前的消费者居然都没更新状态!,由于他们都没有add或update),有编号,updateMemberAndRebalance(仍是我第一个调用add或update,看来仍是只能我来更新状态),更改状态为PreparingRebalance,建立DelayedJoin
  4. 第四个消费者不是Leader,状态为PreparingRebalance,有编号,updateMemberAndRebalance(前面有领导,很差意思了,不能当即返回JoinGroup给你了,大家这些剩下的消费者都只能和领导一块儿返回了,算大家倒霉)

 

图6-19 Leader非第一个发送JoinGroup请求io

4)若是第一个消费者不是Leader,也没有编号,说明这是一个新增的消费者,流程又不一样了如图6-20:table

  1. 第一个消费者不是Leader,状态为Stable,没有编号,addMemberAndRebalance,执行prepareRebalance(我是第一个调用add或update的哦,大家都别想跟我抢这个头彩了),更改状态为PreparingRebalance(我不是Leader但我骄傲啊),建立DelayedJoin(我抢到头彩,固然建立DelayedJoin的工做只能由我来完成了)
  2. 第二个消费者也不是Leader,恭喜你,协调者也放过他,直接返回JoinGroup响应
  3. 第三个消费者是Leader(领导来了),状态为PreparingRebalance(有个新来的不懂规矩,他已经把状态改了),有编号,updateMemberAndRebalance(有人已经改了,你老就不用费心思了),凡是没有当即返回响应的,都须要等待,领导也不例外
  4. 第四个消费者不是Leader(废话,只有一个领导,并且领导已经在前面了),不会当即返回响应(你看领导都排队呢)
  5. 虽然DelayedJoin是由没有编号的消费者建立,不过因为DelayedJoin是以消费组为级别的,因此不用担忧,上一次选举出来的领导仍是领导,协调者最终仍是会把members交给领导,不会是给那个没有编号的消费者的,虽说在他注册的时候已经有编号了,可是你们不认啊。不过领导其实不在乎是谁开始触发prepareRebalance的,那我的要负责生成DelayedJoin,而不论是领导本身仍是其余人一旦更改状态为PreparingRebalance,后面的消费者都要等待DelayedJoin完成了,而领导者老是要等待的,因此他固然无所谓了,由于他知道最后协调者老是会把members交给他的。

 

图6-20 新增消费组第一个发送JoinGroup请求class

根据上面的几种场景总结下来状态机的规则和一些结论以下:

  1. 第一个调用addMemberAndRebalance或者updateMemberAndRebalance的会将状态改成PreparingRebalance,而且负责生成DelayedJoin
  2. 一旦状态进入PreparingRebalance,其余消费者就只能从PreparingRebalance状态入口进入,这里只有两种选择addMemberAndRebalance或者updateMemberAndRebalance,不过他们不会更改状态,也不会生成DelayedJoin
  3. 发生DelayedJoin以后,其余消费者的JoinGroup响应都会被延迟,由于如规则2中,他们只能调用add或update,没法当即调用responseCallback,因此就要和DelayedJoin的那个消费者一块儿等待
  4. 正常流程时,发生responseCallback的是存在成员编号的消费者在Leader以前发送了JoinGroup,或者新增长的消费者发送了JoinGroup请求以前
  5. 第一次Rebalance时,第一个消费者会建立DelayedJoin,以后的Rebalance,只有新增的消费者才有机会建立(若是他在Leader以前发送的话,若是在Leader以后就没有机会了),而普通消费者老是没有机会建立DelayedJoin的,由于状态为Stable时,他会直接开溜,有人(Leader或者新增长的消费者)建立了DelayedJoin以后,他又在那边怨天尤人只能等待
相关文章
相关标签/搜索