Kafka设计解析(四)Kafka Consumer设计解析

转载自 技术世界,原文连接 Kafka设计解析(四)- Kafka Consumer设计解析html

 

目录

1、High Level Consumerjava

1. Consumer Group算法

2. High Level Consumer Rebalance数据库

3、Low Level Consumerapache

4、Consumer从新设计session

1. 设计方向并发

摘要

本文主要介绍了Kafka High Level Consumer,Consumer Group,Consumer Rebalance,Low Level Consumer实现的语义,以及适用场景。以及将来版本中对High Level Consumer的从新设计–使用Consumer Coordinator解决Split Brain和Herd等问题。oop

1、High Level Consumer

 不少时候,客户程序只是但愿从Kafka读取数据,不太关心消息offset的处理。同时也但愿提供一些语义,例如同一条消息只被某一个Consumer消费(单播)或被全部Consumer消费(广播)。所以,Kafka Hight Level Consumer提供了一个从Kafka消费数据的高层抽象,从而屏蔽掉其中的细节并提供丰富的语义。测试

1. Consumer Group

High Level Consumer将从某个Partition读取的最后一条消息的offset存于Zookeeper中(Kafka从0.8.2版本开始同时支持将offset存于Zookeeper中与将offset存于专用的Kafka Topic中)。这个offset基于客户程序提供给Kafka的名字来保存,这个名字被称为Consumer Group。Consumer Group是整个Kafka集群全局的,而非某个Topic的。每个High Level Consumer实例都属于一个Consumer Group,若不指定则属于默认的Group。fetch

Zookeeper中Consumer相关节点以下图所示

不少传统的Message Queue都会在消息被消费完后将消息删除,一方面避免重复消费,另外一方面能够保证Queue的长度比较短,提升效率。而如上文所述,Kafka并不删除已消费的消息,为了实现传统Message Queue消息只被消费一次的语义,Kafka保证每条消息在同一个Consumer Group里只会被某一个Consumer消费。与传统Message Queue不一样的是,Kafka还容许不一样Consumer Group同时消费同一条消息,这一特性能够为消息的多元化处理提供支持。

实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可使用Storm这种实时流处理系统对消息进行实时在线处理,同时使用Hadoop这种批处理系统进行离线处理,还能够同时将数据实时备份到另外一个数据中心,只须要保证这三个操做所使用的Consumer在不一样的Consumer Group便可。下图展现了Kafka在LinkedIn的一种简化部署模型。

为了更清晰展现Kafka Consumer Group的特性,笔者进行了一项测试。建立一个Topic (名为topic1),再建立一个属于group1的Consumer实例,并建立三个属于group2的Consumer实例,而后经过Producer向topic1发送Key分别为1,2,3的消息。结果发现属于group1的Consumer收到了全部的这三条消息,同时group2中的3个Consumer分别收到了Key为1,2,3的消息,以下图所示。

注:上图中每一个黑色区域表明一个Consumer实例,每一个实例只建立一个MessageStream。实际上,本实验将Consumer应用程序打成jar包,并在4个不一样的命令行终端中传入不一样的参数运行。

2. High Level Consumer Rebalance

(本节所讲述Rebalance相关内容均基于Kafka High Level Consumer)

Kafka保证同一Consumer Group中只有一个Consumer会消费某条消息,实际上,Kafka保证的是稳定状态下每个Consumer实例只会消费某一个或多个特定Partition的数据,而某个Partition的数据只会被某一个特定的Consumer实例所消费。也就是说Kafka对消息的分配是以Partition为单位分配的,而非以每一条消息做为分配单元。这样设计的劣势是没法保证同一个Consumer Group里的Consumer均匀消费数据,优点是每一个Consumer不用都跟大量的Broker通讯,减小通讯开销,同时也下降了分配难度,实现也更简单。另外,由于同一个Partition里的数据是有序的,这种设计能够保证每一个Partition里的数据能够被有序消费。

若是某Consumer Group中Consumer(每一个Consumer只建立1个MessageStream)数量少于Partition数量,则至少有一个Consumer会消费多个Partition的数据,若是Consumer的数量与Partition数量相同,则正好一个Consumer消费一个Partition的数据。而若是Consumer的数量多于Partition的数量时,会有部分Consumer没法消费该Topic下任何一条消息。

以下例所示,若是topic1有0,1,2共三个Partition,当group1只有一个Consumer(名为consumer1)时,该 Consumer可消费这3个Partition的全部数据。

增长一个Consumer(consumer2)后,其中一个Consumer(consumer1)可消费2个Partition的数据(Partition 0和Partition 1),另一个Consumer(consumer2)可消费另一个Partition(Partition 2)的数据。

再增长一个Consumer(consumer3)后,每一个Consumer可消费一个Partition的数据。consumer1消费partition0,consumer2消费partition1,consumer3消费partition2。

再增长一个Consumer(consumer4)后,其中3个Consumer可分别消费一个Partition的数据,另一个Consumer(consumer4)不能消费topic1的任何数据。

此时关闭consumer1,其他3个Consumer可分别消费一个Partition的数据。

接着关闭consumer2,consumer3可消费2个Partition,consumer4可消费1个Partition。

再关闭consumer3,仅存的consumer4可同时消费topic1的3个Partition。

Consumer Rebalance的算法以下:

  • 将目标Topic下的全部Partirtion排序,存于PT
  • 对某Consumer Group下全部Consumer排序,存CG,第i个Consumer记为Ci
  • N=size(PT)/size(CG),向上取整
  • 解除Ci对原来分配的Partition的消费权(i从0开始)
  • 将第 i到 (i+1)N个Partition分配给Ci

  
目前,最新版(0.8.2.1)Kafka的Consumer Rebalance的控制策略是由每个Consumer经过在Zookeeper上注册Watch完成的。每一个Consumer被建立时会触发Consumer Group的Rebalance,具体启动流程以下:

  • High Level Consumer启动时将其ID注册到其Consumer Group下,在Zookeeper上的路径为/consumers/[consumer group]/ids/[consumer id]
  • 在/consumers/[consumer group]/ids上注册Watch
  • 在/brokers/ids上注册Watch
  • 若是Consumer经过Topic Filter建立消息流,则它会同时在/brokers/topics上也建立Watch
  • 强制本身在其Consumer Group内启动Rebalance流程

  在这种策略下,每个Consumer或者Broker的增长或者减小都会触发Consumer Rebalance。由于每一个Consumer只负责调整本身所消费的Partition,为了保证整个Consumer Group的一致性,当一个Consumer触发了Rebalance时,该Consumer Group内的其它Consumer也应该同时触发Rebalance。

  该方式有以下缺陷:

根据Kafka社区wiki,Kafka做者正在考虑在还未发布的0.9.x版本中使用中心协调器(Coordinator)。大致思想是为全部Consumer Group的子集选举出一个Broker做为Coordinator,由它Watch Zookeeper,从而判断是否有Partition或者Consumer的增减,而后生成Rebalance命令,并检查这些Rebalance在全部相关的Consumer中是否被执行成功,若是不成功则重试,若成功则认为这次Rebalance成功(这个过程跟Replication Controller很是相似)。具体方案将在后文中详细阐述。 

3、Low Level Consumer

使用Low Level Consumer (Simple Consumer)的主要缘由是,用户但愿比Consumer Group更好的控制数据的消费。好比:

  • 同一条消息读屡次
  • 只读取某个Topic的部分Partition
  • 管理事务,从而确保每条消息被处理一次,且仅被处理一次

与Consumer Group相比,Low Level Consumer要求用户作大量的额外工做。

  • 必须在应用程序中跟踪offset,从而肯定下一条应该消费哪条消息
  • 应用程序须要经过程序获知每一个Partition的Leader是谁
  • 必须处理Leader的变化

使用Low Level Consumer的通常流程以下

  • 查找到一个“活着”的Broker,而且找出每一个Partition的Leader
  • 找出每一个Partition的Follower
  • 定义好请求,该请求应该能描述应用程序须要哪些数据
  • Fetch数据
  • 识别Leader的变化,并对之做出必要的响应

4、Consumer从新设计

根据社区社区wiki,Kafka在0.9.*版本中,从新设计Consumer多是最重要的Feature之一。本节会根据社区wiki介绍Kafka 0.9.*中对Consumer可能的设计方向及思路。

1. 设计方向

简化消费者客户端

部分用户但愿开发和使用non-java的客户端。现阶段使用non-java开发SimpleConsumer比较方便,但想开发High Level Consumer并不容易。由于High Level Consumer须要实现一些复杂但必不可少的失败探测和Rebalance。若是能将消费者客户端更精简,使依赖最小化,将会极大的方便non-java用户实现本身的Consumer。
  
中心Coordinator

如上文所述,当前版本的High Level Consumer存在Herd Effect和Split Brain的问题。若是将失败探测和Rebalance的逻辑放到一个高可用的中心Coordinator,那么这两个问题便可解决。同时还可大大减小Zookeeper的负载,有利于Kafka Broker的Scale Out。
  
容许手工管理offset

一些系统但愿以特定的时间间隔在自定义的数据库中管理Offset。这就要求Consumer能获取到每条消息的metadata,例如Topic,Partition,Offset,同时还须要在Consumer启动时获得每一个Partition的Offset。实现这些,须要提供新的Consumer API。同时有个问题不得不考虑,便是否容许Consumer手工管理部分Topic的Offset,而让Kafka自动经过Zookeeper管理其它Topic的Offset。一个可能的选项是让每一个Consumer只能选取1种Offset管理机制,这可极大的简化Consumer API的设计和实现。
  
Rebalance后触发用户指定的回调

一些应用可能会在内存中为每一个Partition维护一些状态,Rebalance时,它们可能须要将该状态持久化。所以该需求但愿支持用户实现并指定一些可插拔的并在Rebalance时触发的回调。若是用户使用手动的Offset管理,那该需求可方便得由用户实现,而若是用户但愿使用Kafka提供的自动Offset管理,则须要Kafka提供该回调机制。

非阻塞式Consumer API

该需求源于那些实现高层流处理操做,如filter by, group by, join等,的系统。现阶段的阻塞式Consumer几乎不可能实现Join操做。

##如何经过中心Coordinator实现Rebalance

成功Rebalance的结果是,被订阅的全部Topic的每个Partition将会被Consumer Group内的一个(有且仅有一个)Consumer拥有。每个Broker将被选举为某些Consumer Group的Coordinator。某个Cosnumer Group的Coordinator负责在该Consumer Group的成员变化或者所订阅的Topic的Partition变化时协调Rebalance操做。

Consumer
(1) Consumer启动时,先向Broker列表中的任意一个Broker发送ConsumerMetadataRequest,并经过ConsumerMetadataResponse获取它所在Group的Coordinator信息。ConsumerMetadataRequest和ConsumerMetadataResponse的结构以下

ConsumerMetadataRequest { GroupId => String } ConsumerMetadataResponse { ErrorCode => int16 Coordinator => Broker }

(2)Consumer链接到Coordinator并发送HeartbeatRequest,若是返回的HeartbeatResponse没有任何错误码,Consumer继续fetch数据。若其中包含IllegalGeneration错误码,即说明Coordinator已经发起了Rebalance操做,此时Consumer中止fetch数据,commit offset,并发送JoinGroupRequest给它的Coordinator,并在JoinGroupResponse中得到它应该拥有的全部Partition列表和它所属的Group的新的Generation ID。此时Rebalance完成,Consumer开始fetch数据。相应Request和Response结构以下

HeartbeatRequest { GroupId => String GroupGenerationId => int32 ConsumerId => String } HeartbeatResponse { ErrorCode => int16 } JoinGroupRequest { GroupId => String SessionTimeout => int32 Topics => [String] ConsumerId => String PartitionAssignmentStrategy => String } JoinGroupResponse { ErrorCode => int16 GroupGenerationId => int32 ConsumerId => String PartitionsToOwn => [TopicName [Partition]] } TopicName => String Partition => int32

Consumer状态机

 

Down:Consumer中止工做

Start up & discover coordinator:Consumer检测其所在Group的Coordinator。一旦它检测到Coordinator,即向其发送JoinGroupRequest。

Part of a group:该状态下,Consumer已是该Group的成员,并周期性发送HeartbeatRequest。如HeartbeatResponse包含IllegalGeneration错误码,则转换到Stopped Consumption状态。若链接丢失,HeartbeatResponse包含NotCoordinatorForGroup错误码,则转换到Rediscover coordinator状态。

Rediscover coordinator:该状态下,Consumer不中止消费而是尝试经过发送ConsumerMetadataRequest来探测新的Coordinator,而且等待直到得到无错误码的响应。

Stopped consumption:该状态下,Consumer中止消费并提交offset,直到它再次加入Group。

故障检测机制

Consumer成功加入Group后,Consumer和相应的Coordinator同时开始故障探测程序。Consumer向Coordinator发起周期性的Heartbeat(HeartbeatRequest)并等待响应,该周期为 session.timeout.ms/heartbeat.frequency。若Consumer在session.timeout.ms内未收到HeartbeatResponse,或者发现相应的Socket channel断开,它即认为Coordinator已宕机并启动Coordinator探测程序。若Coordinator在session.timeout.ms内没有收到一次HeartbeatRequest,则它将该Consumer标记为宕机状态并为其所在Group触发一次Rebalance操做。

Coordinator Failover过程当中,Consumer可能会在新的Coordinator完成Failover过程以前或以后发现新的Coordinator并向其发送HeatbeatRequest。对于后者,新的Cooodinator可能拒绝该请求,导致该Consumer从新探测Coordinator并发起新的链接请求。若是该Consumer向新的Coordinator发送链接请求太晚,新的Coordinator可能已经在此以前将其标记为宕机状态而将之视为新加入的Consumer并触发一次Rebalance操做。

Coordinator

(1)稳定状态下,Coordinator经过上述故障探测机制跟踪其所管理的每一个Group下的每一个Consumer的健康状态。

(2)刚启动时或选举完成后,Coordinator从Zookeeper读取它所管理的Group列表及这些Group的成员列表。若是没有获取到Group成员信息,它不会作任何事情直到某个Group中有成员注册进来。

(3)在Coordinator完成加载其管理的Group列表及其相应的成员信息以前,它将为HeartbeatRequest,OffsetCommitRequest和JoinGroupRequests返回CoordinatorStartupNotComplete错误码。此时,Consumer会从新发送请求。

(4)Coordinator会跟踪被其所管理的任何Consumer Group注册的Topic的Partition的变化,并为该变化触发Rebalance操做。建立新的Topic也可能触发Rebalance,由于Consumer能够在Topic被建立以前就已经订阅它了。

Coordinator发起Rebalance操做流程以下所示。

 

Coordinator状态机

 

 

Down:Coordinator再也不担任以前负责的Consumer Group的Coordinator

Catch up:该状态下,Coordinator竞选成功,但还未能作好服务相应请求的准备。

Ready:该状态下,新竞选出来的Coordinator已经完成从Zookeeper中加载它所负责管理的全部Group的metadata,并可开始接收相应的请求。

Prepare for rebalance:该状态下,Coordinator在全部HeartbeatResponse中返回IllegalGeneration错误码,并等待全部Consumer向其发送JoinGroupRequest后转到Rebalancing状态。

Rebalancing:该状态下,Coordinator已经收到了JoinGroupRequest请求,并增长其Group Generation ID,分配Consumer ID,分配Partition。Rebalance成功后,它会等待接收包含新的Consumer Generation ID的HeartbeatRequest,并转至Ready状态。

Coordinator Failover

如前文所述,Rebalance操做须要经历以下几个阶段

(1)Topic/Partition的改变或者新Consumer的加入或者已有Consumer中止,触发Coordinator注册在Zookeeper上的watch,Coordinator收到通知准备发起Rebalance操做。

(2)Coordinator经过在HeartbeatResponse中返回IllegalGeneration错误码发起Rebalance操做。

(3)Consumer发送JoinGroupRequest

(4)Coordinator在Zookeeper中增长Group的Generation ID并将新的Partition分配状况写入Zookeeper

(5)Coordinator发送JoinGroupResponse
  
在这个过程当中的每一个阶段,Coordinator均可能出现故障。下面给出Rebalance不一样阶段中Coordinator的Failover处理方式。

(1)若是Coordinator的故障发生在第一阶段,即它收到Notification并将来得及做出响应,则新的Coordinator将从Zookeeper读取Group的metadata,包含这些Group订阅的Topic列表和以前的Partition分配。若是某个Group所订阅的Topic数或者某个Topic的Partition数与以前的Partition分配不一致,亦或者某个Group链接到新的Coordinator的Consumer数与以前Partition分配中的不一致,新的Coordinator会发起Rebalance操做。

(2)若是失败发生在阶段2,它可能对部分而非所有Consumer发出带错误码的HeartbeatResponse。与第上面第一种状况同样,新的Coordinator会检测到Rebalance的必要性并发起一次Rebalance操做。若是Rebalance是由Consumer的失败所触发而且Cosnumer在Coordinator的Failover完成前恢复,新的Coordinator不会为此发起新的Rebalance操做。

(3)若是Failure发生在阶段3,新的Coordinator可能只收到部分而非所有Consumer的JoinGroupRequest。Failover完成后,它可能收到部分Consumer的HeartRequest及另外部分Consumer的JoinGroupRequest。与第1种状况相似,它将发起新一轮的Rebalance操做。

(4)若是Failure发生在阶段4,即它将新的Group Generation ID和Group成员信息写入Zookeeper后。新的Generation ID和Group成员信息以一个原子操做一次性写入Zookeeper。Failover完成后,Consumer会发送HeartbeatRequest给新的Coordinator,并包含旧的Generation ID。此时新的Coordinator经过在HeartbeatResponse中返回IllegalGeneration错误码发起新的一轮Rebalance。这也解释了为何每次HeartbeatRequest中都须要包含Generation ID和Consumer ID。

(5)若是Failure发生在阶段5,旧的Coordinator可能只向Group中的部分Consumer发送了JoinGroupResponse。收到JoinGroupResponse的Consumer在下次向已经失效的Coordinator发送HeartbeatRequest或者提交Offset时会检测到它已经失败。此时,它将检测新的Coordinator并向其发送带有新的Generation ID 的HeartbeatRequest。而未收到JoinGroupResponse的Consumer将检测新的Coordinator并向其发送JoinGroupRequest,这将促使新的Coordinator发起新一轮的Rebalance。

相关文章
相关标签/搜索