coordinator来执行对于consumer group的管理,当consumer group的第一个consumer启动的时候,它会去和kafka server肯定谁是它们组的coordinator。以后该group内的全部成员都会和该coordinator进行协调通讯算法
consumer group如何肯定本身的coordinator是谁呢, 消费者向kafka集群中的任意一个broker发送一个GroupCoordinatorRequest请求,服务端会返回一个负载最小的broker节点的id,并将该broker设置为coordinator.网络
整个rebalance的过程分为两个步骤,Join和Syncspa
join: 表示加入到consumer group中,在这一步中,全部的成员都会向coordinator发送joinGroup的请求。一旦全部成员都发送了joinGroup请求,那么coordinator会选择一个consumer担任leader角色,并把组成员信息和订阅信息发送消费者
leader选举算法比较简单,若是消费组内没有leader,那么第一个加入消费组的消费者就是消费者leader,若是这个时候leader消费者退出了消费组,那么从新选举一个leader,这个选举很随意,相似于随机算法server
protocol_metadata: 序列化后的消费者的订阅信息
leader_id: 消费组中的消费者,coordinator会选择一个座位leader,对应的就是member_id
member_metadata 对应消费者的订阅信息
members:consumer group中所有的消费者的订阅信息
generation_id: 年代信息,相似于以前讲解zookeeper的时候的epoch是同样的,对于每一轮rebalance,generation_id都会递增。主要用来保护consumer group。隔离无效的offset提交。也就是上一轮的consumer成员没法提交offset到新的consumer group中。blog
每一个消费者均可以设置本身的分区分配策略,对于消费组而言,会从各个消费者上报过来的分区分配策略中选举一个彼此都赞同的策略来实现总体的分区分配,这个"赞同"的规则是,消费组内的各个消费者会经过投票来决定.kafka
在joingroup阶段,每一个consumer都会把本身支持的分区分配策略发送到coordinator,coordinator手机到全部消费者的分配策略,组成一个候选集,每一个消费者须要从候选集里找出一个本身支持的策略,而且为这个策略投票
最终计算候选集中各个策略的选票数,票数最多的就是当前消费组的分配策略同步
完成分区分配以后,就进入了Synchronizing Group State阶段,主要逻辑是向GroupCoordinator发送SyncGroupRequest请求,而且处理SyncGroupResponse响应,简单来讲,就是leader将消费者对应的partition分配方案同步给consumer group 中的全部consumerit
每一个消费者都会向coordinator发送syncgroup请求,不过只有leader节点会发送分配方案,其余消费者只是打打酱油而已。当leader把方案发给coordinator之后,coordinator会把结果设置到SyncGroupResponse中。这样全部成员都知道本身应该消费哪一个分区。
Ø consumer group的分区分配方案是在客户端执行的!Kafka将这个权利下放给客户端主要是由于这样作能够有更好的灵活性.io
咱们再来总结一下consumer group rebalance的过程集群
Ø 发起join group请求,两种状况
注: 参照自咕泡mic