kafka 0.9 重写消费者设计

Rebalance是将一个group订阅的topic的一组分区分配给该group组内的consumer实例,使每一个实例都拥有一个独立的互斥的分区。即:一个topic的分区只能被group的一个consumer实例消费,再rebalance以前不能再被group的其余consumer消费。
在一个消费group Reblance成功以后,全部被订阅主题的每个分区在其Group内都有且只有一个consumer实例去消费。
Rebalance的工做方式以下
每一个broker都当选为消费组(consumer group)中某个consumer的协调者(co-ordinator);当一个group中的consumer成员个数变化或者其订阅的topics的分区变化时,做为这个group协调者的broker负责编排Reblance运算。他还负责在参与Reblance运算的全部consumer和分区之间通讯。ios

消费者(Consumer)
一、当消费者启动或者协调者故障时,consumer会发送一个ConsumerMetadataRequest给 bootstrap.brokers配置的列表中的任何一个broker。
二、而后消费者链接协调者而且发送一个心跳请求(HeartbeatRequest),若是返回的HeartbeatResponse中错误码为IllegalGeneration。这表示协调者启动了Reblance。而后消费者中止获取数据,提交offsets而且发送一个JoinGroupRequest 请求给协调者所在broker,在返回的JoinGroupResponse中,它收到它有全消费的topic的分区列表,和所在group的新的Id。此时,消费者开始读取数据而且提交分区的offsets。redis

三、若是在HeartbeatResponse中没有错误,消费者就继续从它最后拥有权限的分区中获取数据,而不会被中断。bootstrap

协调者(Co-ordinator)session

一、在稳定状态下,协调者经过它的故障检测协议跟踪每一个group中每一个消费者的健康情况。
二、在选举结束或者启动时,协调者会从zookeeper中读取它管理的group列表和它们的成员信息。若是一个group以前没有任何成员信息,它不作任何事情,直到某个组的第一个consumer使用它进行注册。socket

三、在协调者没有加载完它负责的全部group的成员信息以前,当consumer发送HearbeatRequests, OffsetCommitRequests 和JoinGroupRequests请求时,会返回CoordinatorStartupNotComplete错误码,而后consumer将会在以后进行从新请求。ide

四、选举工做结束后或启动时,协调者启动了group中的全部消费者故障检测。被标记为死亡的消费者会被从group中移除,而且会触发Rebalance操做。fetch

五、平衡是经过在HeartbeatResponse返回IllegalGeneration错误代码触发。一旦全部活着的消费者经过协调者发送JoinGroupRequests请求要求从新注册,那么它在JoinGroupResponse中告诉每一个consumer他们拥有的新的分区来完成Rebalance操做ui

六、消费者还监听全部有consumer消费的topics的分区变化。若是检测到任何一个topic有新的分区,都会出发Reblance操做。this

Failure detection protocolatom

消费者在向协调者请求加入消费group时能够在JoinGroupRequest中指定session的超市时间。当一个消费者成功加入group,故障检测程序在消费者和协调者上启动。消费者会按期的发起心跳请求(HeartbeatRequest),每session.timeout.ms/heartbeat.frequency发送一次心跳并等待返回。若是协调者在session.timeout.ms期间没有收到来自消费者的心跳请求,就会标记这个消费者死亡。一样的,若是消费在没有在session.timeout.ms时间内收到HeartbeatResponse回应,它就会假设协调者已经挂了,而且开始co-ordinator rediscovery过程。heartbeat.frequency是配置在consumer端的。heartbeat.frequency要设置比较高的值,特别是若是session.timeout.ms也设置的比高大的状况下。可是,应注意heartbeat.frequency不要设置的过高。

接收ConsumerMetadataResponse或JoinGroupResponse以后,消费者按期发送一个HeartbeatRequest到协调者(everysession.timeout.ms/heartbeat.frequency毫秒)。
收到请求的心跳后,协调者检查生成编号,消费者ID和消费group。若是消费者指定了无效或失效代Id,在HeartbeatResponse中会返回IllegalGeneration错误代码。
若是协调者未在session.timeout.ms内接收到任何一次来自消费者心跳请求,它标志着消费者已死并触发组的Rebalance。
若是消费者没有在 session.timeout.ms实践内收到协调者的HeartbeatResponse,或发现socket被关闭,它认为协调者已经挂掉了并触发从新发现协调者的过程。

注意,在协调者故障转移时,消费者可能会发现新的协调者,这个新的协调者可能尚未完成故障转或者已经完成故障转移,包括从ZK中服务消费群(consumer group )的元数据。在后一种状况下,协调者只能接受正常的心跳请求,前一种状况协调者可能会拒绝请求,致使消费者会从新执行发现过程而且从新链接,这比较正常。然而,若是消费者太晚去链接一个新的协调者,协调者会编辑这个消费者死亡,而且触发Rebalance操做。
状态图State diagram

消费者

Here is a description of the states -
Down - The consumer process is down

在启动了和寻找协调者状态,消费者为所在消费组寻找协调者。一旦它发现了协调了消费者就会发送JoinGroupRequest(没有消费ID)。若是有消费者在同组指定的分区分配策略有冲突,那么JoinGroupRequest请求就会收到InconsistentPartitioningStrategy错误代码。若是在JoinGroupRequest请求中指定的分区分配策略(PartitionAssignmentStrategy )Broker没法识别,则会返回UnknownPartitioningStrategy错误代码。在这些状况下,消费者没法加入消费组。

在已经加入组这个状态下,若是消费者收到的JoinGroupResponse没有错误代码,而有consumer id和消费组的generation id,那么消费者就会成为组的一个成员。在这种状态下,消费者发送心跳请求,根据收到的错误代码,消费者将保持这种状态,或者被协调者标记死亡中止消费,或者发起从新发现协调者过程。

在从新寻找协调者状态。消费者不会中止消费,可是会试图经过发送ConsumerMetadataRequest 请求去发起重寻协调者的过程,而且会一直等待一个没有错误代码的响应。

中止消费状态。消费者中止消费而且提交offsets,直到它再次从新加入消费组。

图片描述

协调者

挂机(Down)--协调者死亡或者降级。
Catch up--协调者被选举可是还没准备好接受请求。
Ready--新当选的协调者完成加载它负责的全部消费组的元数据。
Prepare for rebalance--对于消费组内的全部消费者的心跳请求,协调者在心跳响应(HeartbeatResponse )中发送IllegalGeneration错误代码,而且等待消费者发送一个JoinGroupRequest请求。
Rebalancing--协调者从消费者那里收到一个JoinGroupRequest请求,从新生成一个组generation id,指定消费ID( consumer ids)而且分配消费分区。
稳定(Steady)--协调者从每个消费组的全部消费者那里接收OffsetCommitRequests 请求和心跳请求。

图片描述

消费者ID分配

启动后,消费者在从协调者那里收到的第一个JoinGroupResponse中获得它的Consumer Id。从这时起,消费者每次在发送HeartbeatRequest 和OffsetCommitRequest时请求中必须包含consumer id。若是协调者收到HeartbeatRequest或者OffsetCommitRequest中的Consumer Id和消费组中的任何一个都不同,则会在该响应中发送一个UnknownConsumer错误代码。

协调者在Rebalance成功时会分配一个consumer id给消费者,并在JoinGroupResponse中返回。消费者能够选择在之后每次JoinGroupRequest中包含这个consumer id直到关机或死亡,这样作的好处是在进行rebalance操做时会有更低的延迟。当rebalance被触发,协调者会等待前一轮全部的消费者发送JoinGroupRequest,肯定消费者是经过consumer id的方式。若是消费者选择发送JoinGroupRequest不包含consumer id,协调者会彻底在配置的session.timeout.ms的时间中等待,才会进行从新rebalance操做。它这么作的缘由是没法印证一个不存在consumer id的JoinGroupRequest的消费者来源,这使得rebalance有个高(视session.timeout.ms)的延迟。在另外一方面,若是消费者在以后每次JoinGroupRequests中都包含了consumer id,协调者能当即识别消费者,而且一旦全部已知的消费者都发送了JoinGroupRequest就会马上发起rebalance操做。

在收到消费组中全部存在的消费者的JoinGroupRequest请求以后,协调者开始分配consumer id。假设,消费者是新启动的或者是选择不发送以前非配的consumer ID,在这个时候,它给在每一个在JoinGroupRequest中没有consumer id的消费者指定一个新的Id<group>-<uuid>。

若是在JoinGroupRequest 中指定的consumer id和组内成员全部的id都不匹配。协调者会在JoinGroupResponse 响应中发送UnknownConsumer 错误代码,而且拒绝消费者加入消费组。这不会致使组内其余消费者的rebalance操做,可是也不会容许这样的消费者加入现有组。

请求格式

对于每个消费组,协调者存储一下信息:
1)对于每一个消费组,元数据包括:

组订阅的topic列表
      组的配置,包括session timeout等。

组中每个消费者的元数据,包括hostname和consumer id。
每个topic 分区消费的当前的offsets。
分区全部权元数据,包括消费者-分配-分区( consumer-assigned-partitions )映射。
2)对于每一个主题,当前订阅它的消费组的列表。

It is assumed that all the following requests and responses have the common header. So the fields mentioned exclude the ones in the header.
ConsumerMetadataRequest

{
  GroupId                => String
}

ConsumerMetadataResponse

{
  ErrorCode              => int16
  Coordinator            => Broker
}

JoinGroupRequest

{
  GroupId                     => String
  SessionTimeout              => int32
  Topics                      => [String]
  ConsumerId                  => String
  PartitionAssignmentStrategy => String
 }

JoinGroupResponse

{
  ErrorCode              => int16
  GroupGenerationId      => int32
  ConsumerId             => String
  PartitionsToOwn        => [TopicName [Partition]]
}
TopicName => String
Partition => int32

HeartbeatRequest

{
  GroupId                => String
  GroupGenerationId      => int32
  ConsumerId             => String
}

HeartbeatResponse

{
  ErrorCode              => int16
}

OffsetCommitRequest (v1)

OffsetCommitRequest => ConsumerGroup GroupGenerationId ConsumerId [TopicName [Partition Offset TimeStamp Metadata]]
  ConsumerGroup => string
  GroupGenerationId => int32
  ConsumerId => String
  TopicName => string
  Partition => int32
  Offset => int64

TimeStamp => int64
Metadata => string

配置

服务端配置

This list is still in progress
max.session.timeout - 任何组的session timeout不该高于此值以减小对broker的开销。若是高于此值, the broker将会在JoinGroupResponse中返回SessionTimeoutTooHigh错误码。
partition.assignment.strategies - 都好分割的分区策略,用户可自定义,也可使用kafka提供的默认策略。当消费者在JoinGroupRequest指定了分区策略。它必须使用该配置列表中存在的策略,不然将收到一个UnknownPartitionAssignmentStrategyException异常。

通配符订阅

基于通配符订阅(好比,白名单和黑名单),消费者有责任经过topic元数据请求检测匹配的topics。也就是说,它的topic metadata request会包含一个空的topic列表,它的响应会返回全部主题的分区信息,它会过滤topics选出和通配符匹配的主题,而后使用subscribe()接口更新订阅列表。一样,若是订阅列表改变了,它将会触发Reblance
考虑下有趣的场景Interesting scenarios to consider

协调者故障或者协调者断开了链接

协调者故障,控制器自动为因为协调者故障而影响的消费组选取新的leader。协调者从zookeeper读取它负责的消费组的元数据。包括consumer ids,the generation id和订阅的主题列表。知道协调者从zookeeper中读取了全部元数据,它才会在HeartbeatResponse中返回CoordinatorStartupNotComplete错误码。在这段时间内消费者的JoinGroupRequest都是非法的。

假如在broker经过UpdateMetadataRequest请求从控制器(zk?)更新消费组元数据以前,一个消费者向broker发送一个ConsumerMetadataRequest请求。ConsumerMetadataResponse将会返回一个过期的协调者信息,消费者在发送心跳,或者提交offset时将会受到一个NotCoordinatorForGroup错误代码。在收到NotCoordinatorForGroup错误代码后,消费者回滚而且从新发送ConsumerMetadataRequest请求。

在协调者故障或者重寻期间,消费者不会中止读取数据。

订阅的主题分区变化

组的协调者发现它订阅的主题的分区发生了变化。
协调者标记组准备Rebalance并在HeartbeatResponse中发送IllegalGeneration错误代码,而后消费者中止获取数据,提交offsets而且向协调者发送一个JoinGroupRequest。

协调者等待组内全部的消费者发送JoinGroupRequest。一旦收到全部预期的JoinGroupRequests,它在ZK中的组generation id增长。计算新分区分配,而且在JoinGroupResponse中返回更新后的分区分配和一个新的generation id。请注意:即便组内消费者成员没有改变generation id也会递增。

在收到JoinGroupResponse后,消费者在本地存储新的 generation id和consumer id而且开始从返回的分区列表中获取数据。在后面的JoinGroupResponse请求中将会使用新的generation id和consumer id。

rebalance期间的Offset提交

若是消费者受到IllegalGeneration错误代码,就中止获取数据而且在发送JoinGroupRequest以前提交offsets。

协调者会检查OffsetCommitRequest中的generation id,若是比协调者中的generation id高的时候会拒绝请求。这意味着在消费者代码中有bug。

协调者也不容许在提交offset请求时候的generation id比当前组在ZK中存储的generation id(身份认证)旧。在rebalance期间这个约束不是一个问题,从第一个消费者发送JoinGroupRequest到最后一个,协调者不会对组的generation id进行自增,今后时直到协调者返回JoinGroupResponse,它都不会从当前generation的组中的任何消费者那里接受OffsetCommitRequests请求。因此消费者发送的OffsetCommitRequest中的generation id应该和当前协调者中的匹配。

另一种值得一说的状况是当消费者在rebalance期间有一个软错误,好比长时间的GC。若是一个消费者停顿的时间超过了session.timeout.ms配置,协调者就不会在session.timeout.ms时间内收到消费者的JoinGroupRequest请求。协调者标记这个消费者死亡,而且根据在新一轮(new generation)发送JoinGroupRequest的消费者完成rebalance操做。

Heartbeats during rebalance

Consumer periodically sends a HeartbeatRequest to the coordinator every session.timeout.ms/hearbeat.frequency milliseconds. If the consumer receives the IllegalGeneration error code in the HeartbeatResponse, it stops fetching, commits offsets and sends a JoinGroupRequest to the co-ordinator. Until the consumer receives a JoinGroupResponse, it does not send any more HearbeatRequests to the co-ordinator.
A higher heartbeat.frequency ensures lower latency on a rebalance operation since the co-ordinator notifies a consumer of the need to rebalance only on a HeartbeatResponse.
The co-ordinator pauses failure detection for a consumer that has sent a JoinGroupRequest until a JoinGroupResponse is sent out. It restarts the hearbeat timer once the JoinGroupResponse is sent out and marks a consumer dead if it does not receive a HeartbeatRequest from that time for another session.timeout.ms milliseconds.The reason the co-ordinator stops depending on heartbeats to detect failures during a rebalance is due to a design decision on the broker's socket server - Kafka only allows the broker to read and process one outstanding request per client. This is done to make it simpler to reason about ordering. This prevents the consumer and broker from processing a heartbeat request at the same time as a join group request for the same client. Marking failures based on JoinGroupRequest prevents the co-ordinator from marking a consumer dead during a rebalance operation. Note that this does not prevent the rebalance operation from finishing if a consumer goes through a soft failure during a rebalance operation. If the consumer pauses before it sends a JoinGroupRequest, the co-ordinator will mark it dead during the rebalance and complete the rebalance operation by including the rest of the consumers in the new generation. If a consumer pauses after it sends a JoinGroupRequest, the co-ordinator will send it the JoinGroupResponse assuming the rebalance completed successfully and will restart it's heartbeat timer. If the consumer resumes before session.timeout.ms, consumption starts normally. If the consumer pauses for session.timeout.ms after that, then it is marked dead by the co-ordinator and it will trigger a rebalance operation.
The co-ordinator returns the new generation id and consumer id only in the JoinGroupResponse. Once the consumer receives a JoinGroupResponse, it sends the next HeartbeatRequest with the new generation id and consumer id.
Co-ordinator failure during rebalance

一个rebalance操做要通过几个阶段
一、协调者接收rebalance的通知。不管是ZK监听到一个主题/分区改变,或者注册一个新的消费者又或是一个消费者死亡。
二、协调者在HeartbeatResponse中返回IllegalGeneration错误码来初始化rebalance操做。
三、消费者发送JoinGroupRequest。
四、协调者在ZK中自增组的generation id,而且告诉ZK分区的新全部权。
五、协调者返回JoinGroupResponse。

协调者可能在上面任何一步失败:
If the co-ordinator fails at step #1 after receiving a notification but not getting a chance to act on it, the new co-ordinator has to be able to detect the need for a rebalance operation on completing the failover. As part of failover, the co-ordinator reads a group's metadata from zookeeper, including the list of topics the group has subscribed to and the previous partition ownership decision. If the # of topics or # of partitions for the subscribed topics are different from the ones in the previous partition ownership decision, the new co-ordinator detects the need for a rebalance and initiates one for the group. Similarly if the consumers that connect to the new co-ordinator are different from the ones in the group's generation metadata in zookeeper, it initiates a rebalance for the group.
If the co-ordinator fails at step #2, it might send a HeartbeatResponse with the error code to some consumers but not all. Similar to failure #1 above, the co-ordinator will detect the need for rebalance after failover and initiate a rebalance again. If a rebalance was initiated due to a consumer failure and the consumer recovers before the co-ordinator failover completes, the co-ordinator will not initiate a rebalance. However, if any consumer (with an empty or unknown consumer id) sends it a JoinGroupRequest, it will initiate a rebalance for the entire group.
If a co-ordinator fails at step #3, it might receive JoinGroupRequests from only a subset of consumers in the group. After failover, the co-ordinator might receive a HeartbeatRequest from all alive consumers OR JoinGroupRequests from some. Similar to #1, it will trigger a rebalance for the group.
If a co-ordinator fails at step #4, it might fail after writing the new generation id and group membership in zookeeper. The generation id and membership information is written in one atomic zookeeper write operation. After failover, the consumer will send HeartbeatRequests to the new co-ordinator with an older generation id. The co-ordinator triggers a rebalance by returning an IllegalGeneration error code in the response that causes the consumer to send it a JoinGroupRequest. Note that this is the reason why it is worth sending both the generation id as well as the consumer id in the HeartbeatRequest and OffsetCommitRequest
If a co-ordinator fails at step #5, it might send the JoinGroupResponse to only a subset of the consumers in a group. A consumer that received a JoinGroupResponse will detect the failed co-ordinator while sending a heartbeat or committing offsets. At this point, it will discover the new co-ordinator and send it a heartbeat with the new generation id. The co-ordinator will send it a HeartbeatResponse with no error code at this point. A consumer that did not receive a JoinGroupResponse will discover the new co-ordinator and send it a JoinGroupRequest. This will cause the co-ordinator to trigger a rebalance for the group.

慢消费者Slow consumers

若是没有在session.timeout.ms时间内收到心跳请求,协调者能够将慢消费者从组中移除。一般,若是消息处理比session.timeout.ms慢,就会成为慢消费者。致使两次poll()方法的调用间隔比session.timeout.ms时间长。因为心跳只在 poll()调用时才会发送,这就会致使协调者标记慢消费者死亡。下面是协调者如何处理一个慢消费者: 若是没有在session.timeout.ms时间内收到心跳请求,协调者标记消费者死亡而且断开和它的链接。同时,经过向组内其余消费者的HeartbeatResponse中发送IllegalGeneration 错误代码 触发rebalance操做。若是慢消费者在协调者收到组内其余任何消费者的HeartbeatRequest以前发送了心跳请求,它会取消Rebalance操做的意图,并在HeartbeatResponses 中不会发送错误码。若是不是,协调者依然会进行Rebalance操做。而且会给慢消费者也返回IllegalGeneration 错误码。因为协调者只能从活着的消费者那里等待JoinGroupRequest请求,一旦受到其余消费者的加入请求就会完成rebalance操做。若是慢消费者刚好也发送了JoinGroupRequest,协调者就会在当前一轮包含它,反之就不会包含这个慢消费者。若是协调者已经返回了JoinGroupResponse,它会在当前Rebalance完成以后,再触发新的Rebalance操做。若是本轮须要很长时间,慢消费者的接收JoinGroupResponse超时,它就会发起重寻协调者,而且给协调者从新发送JoinGroupRequest。

相关文章
相关标签/搜索