原创文章,转载请务必将下面这段话置于文章开头处。(已受权InfoQ中文站发布)
本文转发自Jason's Blog,原文连接 http://www.jasongj.com/2015/08/09/KafkaColumn4html
本文主要介绍了Kafka High Level Consumer,Consumer Group,Consumer Rebalance,Low Level Consumer实现的语义,以及适用场景。以及将来版本中对High Level Consumer的从新设计--使用Consumer Coordinator解决Split Brain和Herd等问题。java
不少时候,客户程序只是但愿从Kafka读取数据,不太关心消息offset的处理。同时也但愿提供一些语义,例如同一条消息只被某一个Consumer消费(单播)或被全部Consumer消费(广播)。所以,Kafka Hight Level Consumer提供了一个从Kafka消费数据的高层抽象,从而屏蔽掉其中的细节并提供丰富的语义。
算法
High Level Consumer将从某个Partition读取的最后一条消息的offset存于Zookeeper中(Kafka从0.8.2版本开始同时支持将offset存于Zookeeper中与将offset存于专用的Kafka Topic中)。这个offset基于客户程序提供给Kafka的名字来保存,这个名字被称为Consumer Group。Consumer Group是整个Kafka集群全局的,而非某个Topic的。每个High Level Consumer实例都属于一个Consumer Group,若不指定则属于默认的Group。
Zookeeper中Consumer相关节点以下图所示
不少传统的Message Queue都会在消息被消费完后将消息删除,一方面避免重复消费,另外一方面能够保证Queue的长度比较短,提升效率。而如上文所述,Kafka并不删除已消费的消息,为了实现传统Message Queue消息只被消费一次的语义,Kafka保证每条消息在同一个Consumer Group里只会被某一个Consumer消费。与传统Message Queue不一样的是,Kafka还容许不一样Consumer Group同时消费同一条消息,这一特性能够为消息的多元化处理提供支持。
实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可使用Storm这种实时流处理系统对消息进行实时在线处理,同时使用Hadoop这种批处理系统进行离线处理,还能够同时将数据实时备份到另外一个数据中心,只须要保证这三个操做所使用的Consumer在不一样的Consumer Group便可。下图展现了Kafka在LinkedIn的一种简化部署模型。
为了更清晰展现Kafka Consumer Group的特性,笔者进行了一项测试。建立一个Topic (名为topic1),再建立一个属于group1的Consumer实例,并建立三个属于group2的Consumer实例,而后经过Producer向topic1发送Key分别为1,2,3的消息。结果发现属于group1的Consumer收到了全部的这三条消息,同时group2中的3个Consumer分别收到了Key为1,2,3的消息,以下图所示。
注:上图中每一个黑色区域表明一个Consumer实例,每一个实例只建立一个MessageStream。实际上,本实验将Consumer应用程序打成jar包,并在4个不一样的命令行终端中传入不一样的参数运行。数据库
(本节所讲述Rebalance相关内容均基于Kafka High Level Consumer)
Kafka保证同一Consumer Group中只有一个Consumer会消费某条消息,实际上,Kafka保证的是稳定状态下每个Consumer实例只会消费某一个或多个特定Partition的数据,而某个Partition的数据只会被某一个特定的Consumer实例所消费。也就是说Kafka对消息的分配是以Partition为单位分配的,而非以每一条消息做为分配单元。这样设计的劣势是没法保证同一个Consumer Group里的Consumer均匀消费数据,优点是每一个Consumer不用都跟大量的Broker通讯,减小通讯开销,同时也下降了分配难度,实现也更简单。另外,由于同一个Partition里的数据是有序的,这种设计能够保证每一个Partition里的数据能够被有序消费。
若是某Consumer Group中Consumer(每一个Consumer只建立1个MessageStream)数量少于Partition数量,则至少有一个Consumer会消费多个Partition的数据,若是Consumer的数量与Partition数量相同,则正好一个Consumer消费一个Partition的数据。而若是Consumer的数量多于Partition的数量时,会有部分Consumer没法消费该Topic下任何一条消息。
以下例所示,若是topic1有0,1,2共三个Partition,当group1只有一个Consumer(名为consumer1)时,该 Consumer可消费这3个Partition的全部数据。
增长一个Consumer(consumer2)后,其中一个Consumer(consumer1)可消费2个Partition的数据(Partition 0和Partition 1),另一个Consumer(consumer2)可消费另一个Partition(Partition 2)的数据。
再增长一个Consumer(consumer3)后,每一个Consumer可消费一个Partition的数据。consumer1消费partition0,consumer2消费partition1,consumer3消费partition2。
再增长一个Consumer(consumer4)后,其中3个Consumer可分别消费一个Partition的数据,另一个Consumer(consumer4)不能消费topic1的任何数据。
此时关闭consumer1,其他3个Consumer可分别消费一个Partition的数据。
接着关闭consumer2,consumer3可消费2个Partition,consumer4可消费1个Partition。
再关闭consumer3,仅存的consumer4可同时消费topic1的3个Partition。
apache
Consumer Rebalance的算法以下:json
目前,最新版(0.8.2.1)Kafka的Consumer Rebalance的控制策略是由每个Consumer经过在Zookeeper上注册Watch完成的。每一个Consumer被建立时会触发Consumer Group的Rebalance,具体启动流程以下:session
/consumers/[consumer group]/ids/[consumer id]
/consumers/[consumer group]/ids
上注册Watch/brokers/ids
上注册Watch/brokers/topics
上也建立Watch在这种策略下,每个Consumer或者Broker的增长或者减小都会触发Consumer Rebalance。由于每一个Consumer只负责调整本身所消费的Partition,为了保证整个Consumer Group的一致性,当一个Consumer触发了Rebalance时,该Consumer Group内的其它全部其它Consumer也应该同时触发Rebalance。架构
该方式有以下缺陷:并发
根据Kafka社区wiki,Kafka做者正在考虑在还未发布的0.9.x版本中使用中心协调器(Coordinator)。大致思想是为全部Consumer Group的子集选举出一个Broker做为Coordinator,由它Watch Zookeeper,从而判断是否有Partition或者Consumer的增减,而后生成Rebalance命令,并检查是否这些Rebalance在全部相关的Consumer中被执行成功,若是不成功则重试,若成功则认为这次Rebalance成功(这个过程跟Replication Controller很是相似)。具体方案将在后文中详细阐述。
oop
使用Low Level Consumer (Simple Consumer)的主要缘由是,用户但愿比Consumer Group更好的控制数据的消费。好比:
与Consumer Group相比,Low Level Consumer要求用户作大量的额外工做。
使用Low Level Consumer的通常流程以下
根据社区社区wiki,Kafka在0.9.*版本中,从新设计Consumer多是最重要的Feature之一。本节会根据社区wiki介绍Kafka 0.9.*中对Consumer可能的设计方向及思路。
简化消费者客户端
部分用户但愿开发和使用non-java的客户端。现阶段使用non-java发SimpleConsumer比较方便,但想开发High Level Consumer并不容易。由于High Level Consumer须要实现一些复杂但必不可少的失败探测和Rebalance。若是能将消费者客户端更精简,使依赖最小化,将会极大的方便non-java用户实现本身的Consumer。
中心Coordinator
如上文所述,当前版本的High Level Consumer存在Herd Effect和Split Brain的问题。若是将失败探测和Rebalance的逻辑放到一个高可用的中心Coordinator,那么这两个问题便可解决。同时还可大大减小Zookeeper的负载,有利于Kafka Broker的Scale Out。
容许手工管理offset
一些系统但愿以特定的时间间隔在自定义的数据库中管理Offset。这就要求Consumer能获取到每条消息的metadata,例如Topic,Partition,Offset,同时还须要在Consumer启动时获得每一个Partition的Offset。实现这些,须要提供新的Consumer API。同时有个问题不得不考虑,便是否容许Consumer手工管理部分Topic的Offset,而让Kafka自动经过Zookeeper管理其它Topic的Offset。一个可能的选项是让每一个Consumer只能选取1种Offset管理机制,这可极大的简化Consumer API的设计和实现。
Rebalance后触发用户指定的回调
一些应用可能会在内存中为每一个Partition维护一些状态,Rebalance时,它们可能须要将该状态持久化。所以该需求但愿支持用户实现并指定一些可插拔的并在Rebalance时触发的回调。若是用户使用手动的Offset管理,那该需求可方便得由用户实现,而若是用户但愿使用Kafka提供的自动Offset管理,则须要Kafka提供该回调机制。
非阻塞式Consumer API
该需求源于那些实现高层流处理操做,如filter by, group by, join等,的系统。现阶段的阻塞式Consumer几乎不可能实现Join操做。
成功Rebalance的结果是,被订阅的全部Topic的每个Partition将会被Consumer Group内的一个(有且仅有一个)Consumer拥有。每个Broker将被选举为某些Consumer Group的Coordinator。某个Cosnumer Group的Coordinator负责在该Consumer Group的成员变化或者所订阅的Topic的Partititon变化时协调Rebalance操做。
Consumer
1) Consumer启动时,先向Broker列表中的任意一个Broker发送ConsumerMetadataRequest,并经过ConsumerMetadataResponse获取它所在Group的Coordinator信息。ConsumerMetadataRequest和ConsumerMetadataResponse的结构以下
ConsumerMetadataRequest { GroupId => String } ConsumerMetadataResponse { ErrorCode => int16 Coordinator => Broker }
2)Consumer链接到Coordinator并发送HeartbeatRequest,若是返回的HeartbeatResponse没有任何错误码,Consumer继续fetch数据。若其中包含IllegalGeneration错误码,即说明Coordinator已经发起了Rebalance操做,此时Consumer中止fetch数据,commit offset,并发送JoinGroupRequest给它的Coordinator,并在JoinGroupResponse中得到它应该拥有的全部Partition列表和它所属的Group的新的Generation ID。此时Rebalance完成,Consumer开始fetch数据。相应Request和Response结构以下
HeartbeatRequest { GroupId => String GroupGenerationId => int32 ConsumerId => String } HeartbeatResponse { ErrorCode => int16 } JoinGroupRequest { GroupId => String SessionTimeout => int32 Topics => [String] ConsumerId => String PartitionAssignmentStrategy => String } JoinGroupResponse { ErrorCode => int16 GroupGenerationId => int32 ConsumerId => String PartitionsToOwn => [TopicName [Partition]] } TopicName => String Partition => int32
Consumer状态机
Down:Consumer中止工做
Start up & discover coordinator:Consumer检测其所在Group的Coordinator。一旦它检测到Coordinator,即向其发送JoinGroupRequest。
Part of a group:该状态下,Consumer已是该Group的成员,并周期性发送HeartbeatRequest。如HeartbeatResponse包含IllegalGeneration错误码,则转换到Stopped Consumption状态。若链接丢失,HeartbeatResponse包含NotCoordinatorForGroup错误码,则转换到Rediscover coordinator状态。
Rediscover coordinator:该状态下,Consumer不中止消费而是尝试经过发送ConsumerMetadataRequest来探测新的Coordinator,而且等待直到得到无错误码的响应。
Stopped consumption:该状态下,Consumer中止消费并提交offset,直到它再次加入Group。
故障检测机制
Consumer成功加入Group后,Consumer和相应的Coordinator同时开始故障探测程序。Consumer向Coordinator发起周期性的Heartbeat(HeartbeatRequest)并等待响应,该周期为 session.timeout.ms/heartbeat.frequency。若Consumer在session.timeout.ms内未收到HeartbeatResponse,或者发现相应的Socket channel断开,它即认为Coordinator已宕机并启动Coordinator探测程序。若Coordinator在session.timeout.ms内没有收到一次HeartbeatRequest,则它将该Consumer标记为宕机状态并为其所在Group触发一次Rebalance操做。
Coordinator Failover过程当中,Consumer可能会在新的Coordinator完成Failover过程以前或以后发现新的Coordinator并向其发送HeatbeatRequest。对于后者,新的Cooodinator可能拒绝该请求,导致该Consumer从新探测Coordinator并发起新的链接请求。若是该Consumer向新的Coordinator发送链接请求太晚,新的Coordinator可能已经在此以前将其标记为宕机状态而将之视为新加入的Consumer并触发一次Rebalance操做。
Coordinator
1)稳定状态下,Coordinator经过上述故障探测机制跟踪其所管理的每一个Group下的每一个Consumer的健康状态。
2)刚启动时或选举完成后,Coordinator从Zookeeper读取它所管理的Group列表及这些Group的成员列表。若是没有获取到Group成员信息,它不会作任何事情直到某个Group中有成员注册进来。
3)在Coordinator完成加载其管理的Group列表及其相应的成员信息以前,它将为HeartbeatRequest,OffsetCommitRequest和JoinGroupRequests返回CoordinatorStartupNotComplete错误码。此时,Consumer会从新发送请求。
4)Coordinator会跟踪被其所管理的任何Consumer Group注册的Topic的Partition的变化,并为该变化触发Rebalance操做。建立新的Topic也可能触发Rebalance,由于Consumer能够在Topic被建立以前就已经订阅它了。
Coordinator发起Rebalance操做流程以下所示。
Coordinator状态机
Down:Coordinator再也不担任以前负责的Consumer Group的Coordinator
Catch up:该状态下,Coordinator竞选成功,但还未能作好服务相应请求的准备。
Ready:该状态下,新竞选出来的Coordinator已经完成从Zookeeper中加载它所负责管理的全部Group的metadata,并可开始接收相应的请求。
Prepare for rebalance:该状态下,Coordinator在全部HeartbeatResponse中返回IllegalGeneration错误码,并等待全部Consumer向其发送JoinGroupRequest后转到Rebalancing状态。
Rebalancing:该状态下,Coordinator已经收到了JoinGroupRequest请求,并增长其Group Generation ID,分配Consumer ID,分配Partition。Rebalance成功后,它会等待接收包含新的Consumer Generation ID的HeartbeatRequest,并转至Ready状态。
Coordinator Failover
如前文所述,Rebalance操做须要经历以下几个阶段
1)Topic/Partition的改变或者新Consumer的加入或者已有Consumer中止,触发Coordinator注册在Zookeeper上的watch,Coordinator收到通知准备发起Rebalance操做。
2)Coordinator经过在HeartbeatResponse中返回IllegalGeneration错误码发起Rebalance操做。
3)Consumer发送JoinGroupRequest
4)Coordinator在Zookeeper中增长Group的Generation ID并将新的Partition分配状况写入Zookeeper
5)Coordinator发送JoinGroupResponse
在这个过程当中的每一个阶段,Coordinator均可能出现故障。下面给出Rebalance不一样阶段中Coordinator的Failover处理方式。
1)若是Coordinator的故障发生在第一阶段,即它收到Notification并将来得及做出响应,则新的Coordinator将从Zookeeper读取Group的metadata,包含这些Group订阅的Topic列表和以前的Partition分配。若是某个Group所订阅的Topic数或者某个Topic的Partition数与以前的Partition分配不一致,亦或者某个Group链接到新的Coordinator的Consumer数与以前Partition分配中的不一致,新的Coordinator会发起Rebalance操做。
2)若是失败发生在阶段2,它可能对部分而非所有Consumer发出带错误码的HeartbeatResponse。与第上面第一种状况同样,新的Coordinator会检测到Rebalance的必要性并发起一次Rebalance操做。若是Rebalance是由Consumer的失败所触发而且Cosnumer在Coordinator的Failover完成前恢复,新的Coordinator不会为此发起新的Rebalance操做。
3)若是Failure发生在阶段3,新的Coordinator可能只收到部分而非所有Consumer的JoinGroupRequest。Failover完成后,它可能收到部分Consumer的HeartRequest及另外部分Consumer的JoinGroupRequest。与第1种状况相似,它将发起新一轮的Rebalance操做。
4)若是Failure发生在阶段4,即它将新的Group Generation ID和Group成员信息写入Zookeeper后。新的Generation ID和Group成员信息以一个原子操做一次性写入Zookeeper。Failover完成后,Consumer会发送HeartbeatRequest给新的Coordinator,并包含旧的Generation ID。此时新的Coordinator经过在HeartbeatResponse中返回IllegalGeneration错误码发起新的一轮Rebalance。这也解释了为何每次HeartbeatRequest中都须要包含Generation ID和Consumer ID。
5)若是Failure发生在阶段5,旧的Coordinator可能只向Group中的部分Consumer发送了JoinGroupResponse。收到JoinGroupResponse的Consumer在下次向已经失效的Coordinator发送HeartbeatRequest或者提交Offset时会检测到它已经失败。此时,它将检测新的Coordinator并向其发送带有新的Generation ID 的HeartbeatRequest。而未收到JoinGroupResponse的Consumer将检测新的Coordinator并向其发送JoinGroupRequest,这将促使新的Coordinator发起新一轮的Rebalance。