kafka-Reblance

谁来执行Rebalance以及管理consumer的group呢

coordinator来执行对于consumer group的管理,当consumer group的第一个consumer启动的时候,它会去和kafka server肯定谁是它们组的coordinator。以后该group内的全部成员都会和该coordinator进行协调通讯算法

如何肯定coordinator?

consumer group如何肯定本身的coordinator是谁呢, 消费者向kafka集群中的任意一个broker发送一个GroupCoordinatorRequest请求,服务端会返回一个负载最小的broker节点的id,并将该broker设置为coordinator.网络

JoinGroup的过程

整个rebalance的过程分为两个步骤,Join和Syncspa

join: 表示加入到consumer group中,在这一步中,全部的成员都会向coordinator发送joinGroup的请求。一旦全部成员都发送了joinGroup请求,那么coordinator会选择一个consumer担任leader角色,并把组成员信息和订阅信息发送消费者
leader选举算法比较简单,若是消费组内没有leader,那么第一个加入消费组的消费者就是消费者leader,若是这个时候leader消费者退出了消费组,那么从新选举一个leader,这个选举很随意,相似于随机算法server

 

protocol_metadata: 序列化后的消费者的订阅信息
leader_id: 消费组中的消费者,coordinator会选择一个座位leader,对应的就是member_id
member_metadata 对应消费者的订阅信息
members:consumer group中所有的消费者的订阅信息
generation_id: 年代信息,相似于以前讲解zookeeper的时候的epoch是同样的,对于每一轮rebalance,generation_id都会递增。主要用来保护consumer group。隔离无效的offset提交。也就是上一轮的consumer成员没法提交offset到新的consumer group中。blog

 

肯定分区分配策略

每一个消费者均可以设置本身的分区分配策略,对于消费组而言,会从各个消费者上报过来的分区分配策略中选举一个彼此都赞同的策略来实现总体的分区分配,这个"赞同"的规则是,消费组内的各个消费者会经过投票来决定.kafka

在joingroup阶段,每一个consumer都会把本身支持的分区分配策略发送到coordinator,coordinator手机到全部消费者的分配策略,组成一个候选集,每一个消费者须要从候选集里找出一个本身支持的策略,而且为这个策略投票
最终计算候选集中各个策略的选票数,票数最多的就是当前消费组的分配策略同步

 

Synchronizing Group State阶段

完成分区分配以后,就进入了Synchronizing Group State阶段,主要逻辑是向GroupCoordinator发送SyncGroupRequest请求,而且处理SyncGroupResponse响应,简单来讲,就是leader将消费者对应的partition分配方案同步给consumer group 中的全部consumerit

 

 

 

 

每一个消费者都会向coordinator发送syncgroup请求,不过只有leader节点会发送分配方案,其余消费者只是打打酱油而已。当leader把方案发给coordinator之后,coordinator会把结果设置到SyncGroupResponse中。这样全部成员都知道本身应该消费哪一个分区。
Ø consumer group的分区分配方案是在客户端执行的!Kafka将这个权利下放给客户端主要是由于这样作能够有更好的灵活性.io

 

总结


咱们再来总结一下consumer group rebalance的过程集群

 

  1. Ø 对于每一个consumer group子集,都会在服务端对应一个GroupCoordinator进行管理,GroupCoordinator会在zookeeper上添加watcher,当消费者加入或者退出consumer group时,会修改zookeeper上保存的数据,从而触发GroupCoordinator开始Rebalance操做
  2. Ø 当消费者准备加入某个Consumer group或者GroupCoordinator发生故障转移时,消费者并不知道GroupCoordinator的在网络中的位置,这个时候就须要肯定GroupCoordinator,消费者会向集群中的任意一个Broker节点发送ConsumerMetadataRequest请求,收到请求的broker会返回一个response做为响应,其中包含管理当前ConsumerGroup的GroupCoordinator,
  3. Ø 消费者会根据broker的返回信息,链接到groupCoordinator,而且发送HeartbeatRequest,发送心跳的目的是要要奥噶苏GroupCoordinator这个消费者是正常在线的。当消费者在指定时间内没有发送心跳请求,则GroupCoordinator会触发Rebalance操做。

Ø 发起join group请求,两种状况

  1. 若是GroupCoordinator返回的心跳包数据包含异常,说明GroupCoordinator由于前面说的几种状况致使了Rebalance操做,那这个时候,consumer会发起join group请求
  2. 新加入到consumer group的consumer肯定好了GroupCoordinator之后,消费者会向GroupCoordinator发起join group请求,
  3. GroupCoordinator会收集所有消费者信息以后,来确承认用的消费者,并从中选取一个消费者成为group_leader。并把相应的信息(分区分配策略、leader_id、…)封装成response返回给全部消费者,可是只有group leader会收到当前consumer group中的全部消费者信息
  4. 当消费者肯定本身是group leader之后,会根据消费者的信息以及选定分区分配策略进行分区分配接着进入Synchronizing Group State阶段,
  5. 每一个消费者会发送SyncGroupRequest请求到GroupCoordinator,可是只有Group Leader的请求会存在分区分配结果(Leader负责根据分区分配规则进行分区分配),GroupCoordinator会根据Group Leader的分区分配结果造成SyncGroupResponse返回给全部的Consumer。
  6. consumer根据分配结果,执行相应的操做

 

 

注: 参照自咕泡mic 

相关文章
相关标签/搜索
本站公众号
   欢迎关注本站公众号,获取更多信息