在开始以前,我想花一点时间先来明确一些概念和术语,这会极大地方便咱们下面的讨论。另外请原谅这文章有点长,毕竟要讨论的东西不少,虽然已然删除了不少太过细节的东西。html
1 Kafka的版本java
不少人在Kafka中国社区(替群主作个宣传,QQ号:162272557)提问时的开头常常是这样的:“我使用的kafka版本是2.10/2.11, 如今碰到一个奇怪的问题。。。。” 无心冒犯,但这里的2.10/2.11不是kafka的版本,而是编译kafka的Scala版本。Kafka的server端代码是由Scala语言编写的,目前Scala主流的3个版本分别是2.十、2.11和2.12。实际上Kafka如今每一个PULL request都已经自动增长了这三个版本的检查。下图是个人一个PULL request,能够看到这个fix会同时使用3个scala版本作编译检查:git
目前普遍使用kafka的版本应该是这三个大版本:0.8.x, 0.9.x和0.10.* 。 这三个版本对于consumer和consumer group来讲都有很大的变化,咱们后面会详谈。github
2 新版本 VS 老版本正则表达式
“个人kafkaoffsetmonitor为何没法监控到offset了?”——这是我在Kafka中国社区见到最多的问题,没有之一!实际上,Kafka 0.9开始提供了新版本的consumer及consumer group,位移的管理与保存机制发生了很大的变化——新版本consumer默认将再也不保存位移到zookeeper中,而目前kafkaoffsetmonitor尚未应对这种变化(虽然已经有不少人在要求他们改了,详见https://github.com/quantifind/KafkaOffsetMonitor/issues/79),因此颇有多是由于你使用了新版本的consumer才没法看到的。关于新旧版本,这里统一说明一下:kafka0.9之前的consumer是使用Scala编写的,包名结构是kafka.consumer.*,分为high-level consumer和low-level consumer两种。咱们熟知的ConsumerConnector、ZookeeperConsumerConnector以及SimpleConsumer就是这个版本提供的;自0.9版本开始,Kafka提供了java版本的consumer,包名结构是o.a.k.clients.consumer.*,熟知的类包括KafkaConsumer和ConsumerRecord等。新版本的consumer能够单独部署,再也不须要依赖server端的代码。apache
2、消费者组 (Consumer Group)服务器
1 什么是消费者组网络
其实对于这些基本概念的普及,网上资料实在太多了。我本不该该再多此一举了,但为了本文的完整性,我仍是要花一些篇幅来重谈consumer group,至少能够说说个人理解。值得一提的是,因为咱们今天基本上只探讨consumer group,对于单独的消费者不作过多讨论。session
什么是consumer group? 一言以蔽之,consumer group是kafka提供的可扩展且具备容错性的消费者机制。既然是一个组,那么组内必然能够有多个消费者或消费者实例(consumer instance),它们共享一个公共的ID,即group ID。组内的全部消费者协调在一块儿来消费订阅主题(subscribed topics)的全部分区(partition)。固然,每一个分区只能由同一个消费组内的一个consumer来消费。(网上文章中说到此处各类炫目多彩的图就会紧跟着抛出来,我这里就不画了,请原谅)。我的认为,理解consumer group记住下面这三个特性就行了:数据结构
2 消费者位置(consumer position)
消费者在消费的过程当中须要记录本身消费了多少数据,即消费位置信息。在Kafka中这个位置信息有个专门的术语:位移(offset)。不少消息引擎都把这部分信息保存在服务器端(broker端)。这样作的好处固然是实现简单,但会有三个主要的问题:1. broker今后变成有状态的,会影响伸缩性;2. 须要引入应答机制(acknowledgement)来确认消费成功。3. 因为要保存不少consumer的offset信息,必然引入复杂的数据结构,形成资源浪费。而Kafka选择了不一样的方式:每一个consumer group保存本身的位移信息,那么只须要简单的一个整数表示位置就够了;同时能够引入checkpoint机制按期持久化,简化了应答机制的实现。
3 位移管理(offset management)
3.1 自动VS手动
Kafka默认是按期帮你自动提交位移的(enable.auto.commit = true),你固然能够选择手动提交位移实现本身控制。另外kafka会按期把group消费状况保存起来,作成一个offset map,以下图所示:
上图中代表了test-group这个组当前的消费状况。
3.2 位移提交
老版本的位移是提交到zookeeper中的,图就不画了,总之目录结构是:/consumers/<group.id>/offsets/<topic>/<partitionId>,可是zookeeper其实并不适合进行大批量的读写操做,尤为是写操做。所以kafka提供了另外一种解决方案:增长__consumeroffsets topic,将offset信息写入这个topic,摆脱对zookeeper的依赖(指保存offset这件事情)。__consumer_offsets中的消息保存了每一个consumer group某一时刻提交的offset信息。依然以上图中的consumer group为例,格式大概以下:
__consumers_offsets topic配置了compact策略,使得它老是可以保存最新的位移信息,既控制了该topic整体的日志容量,也能实现保存最新offset的目的。compact的具体原理请参见:Log Compaction
至于每一个group保存到__consumers_offsets的哪一个分区,如何查看的问题请参见这篇文章:Kafka 如何读取offset topic内容 (__consumer_offsets)
4 Rebalance
4.1 什么是rebalance?
rebalance本质上是一种协议,规定了一个consumer group下的全部consumer如何达成一致来分配订阅topic的每一个分区。好比某个group下有20个consumer,它订阅了一个具备100个分区的topic。正常状况下,Kafka平均会为每一个consumer分配5个分区。这个分配的过程就叫rebalance。
4.2 何时rebalance?
这也是常常被说起的一个问题。rebalance的触发条件有三种:
4.3 如何进行组内分区分配?
以前提到了group下的全部consumer都会协调在一块儿共同参与分配,这是如何完成的?Kafka新版本consumer默认提供了两种分配策略:range和round-robin。固然Kafka采用了可插拔式的分配策略,你能够建立本身的分配器以实现不一样的分配策略。实际上,因为目前range和round-robin两种分配器都有一些弊端,Kafka社区已经提出第三种分配器来实现更加公平的分配策略,只是目前还在开发中。咱们这里只须要知道consumer group默认已经帮咱们把订阅topic的分区分配工做作好了就好了。
简单举个例子,假设目前某个consumer group下有两个consumer: A和B,当第三个成员加入时,kafka会触发rebalance并根据默认的分配策略从新为A、B和C分配分区,以下图所示:
4.4 谁来执行rebalance和consumer group管理?
Kafka提供了一个角色:coordinator来执行对于consumer group的管理。坦率说kafka对于coordinator的设计与修改是一个很长的故事。最新版本的coordinator也与最初的设计有了很大的不一样。这里我只想说起两次比较大的改变。
首先是0.8版本的coordinator,那时候的coordinator是依赖zookeeper来实现对于consumer group的管理的。Coordinator监听zookeeper的/consumers/<group>/ids的子节点变化以及/brokers/topics/<topic>数据变化来判断是否须要进行rebalance。group下的每一个consumer都本身决定要消费哪些分区,并把本身的决定抢先在zookeeper中的/consumers/<group>/owners/<topic>/<partition>下注册。很明显,这种方案要依赖于zookeeper的帮助,并且每一个consumer是单独作决定的,没有那种“你们属于一个组,要协商作事情”的精神。
基于这些潜在的弊端,0.9版本的kafka改进了coordinator的设计,提出了group coordinator——每一个consumer group都会被分配一个这样的coordinator用于组管理和位移管理。这个group coordinator比原来承担了更多的责任,好比组成员管理、位移提交保护机制等。当新版本consumer group的第一个consumer启动的时候,它会去和kafka server肯定谁是它们组的coordinator。以后该group内的全部成员都会和该coordinator进行协调通讯。显而易见,这种coordinator设计再也不须要zookeeper了,性能上能够获得很大的提高。后面的全部部分咱们都将讨论最新版本的coordinator设计。
4.5 如何肯定coordinator?
上面简单讨论了新版coordinator的设计,那么consumer group如何肯定本身的coordinator是谁呢? 简单来讲分为两步:
4.6 Rebalance Generation
JVM GC的分代收集就是这个词(严格来讲是generational),我这里把它翻译成“届”好了,它表示了rebalance以后的一届成员,主要是用于保护consumer group,隔离无效offset提交的。好比上一届的consumer成员是没法提交位移到新一届的consumer group中。咱们有时候能够看到ILLEGAL_GENERATION的错误,就是kafka在抱怨这件事情。每次group进行rebalance以后,generation号都会加1,表示group进入到了一个新的版本,以下图所示: Generation 1时group有3个成员,随后成员2退出组,coordinator触发rebalance,consumer group进入Generation 2,以后成员4加入,再次触发rebalance,group进入Generation 3.
4.7 协议(protocol)
前面说过了, rebalance本质上是一组协议。group与coordinator共同使用它来完成group的rebalance。目前kafka提供了5个协议来处理与consumer group coordination相关的问题:
Coordinator在rebalance的时候主要用到了前面4种请求。
4.8 liveness
consumer如何向coordinator证实本身还活着? 经过定时向coordinator发送Heartbeat请求。若是超过了设定的超时时间,那么coordinator就认为这个consumer已经挂了。一旦coordinator认为某个consumer挂了,那么它就会开启新一轮rebalance,而且在当前其余consumer的心跳response中添加“REBALANCE_IN_PROGRESS”,告诉其余consumer:很差意思各位,大家从新申请加入组吧!
4.9 Rebalance过程
终于说到consumer group执行rebalance的具体流程了。不少用户估计对consumer内部的工做机制也很感兴趣。下面就跟你们一块儿讨论一下。固然我必需要明确表示,rebalance的前提是coordinator已经肯定了。
整体而言,rebalance分为2步:Join和Sync
1 Join, 顾名思义就是加入组。这一步中,全部成员都向coordinator发送JoinGroup请求,请求入组。一旦全部成员都发送了JoinGroup请求,coordinator会从中选择一个consumer担任leader的角色,并把组成员信息以及订阅信息发给leader——注意leader和coordinator不是一个概念。leader负责消费分配方案的制定。
2 Sync,这一步leader开始分配消费方案,即哪一个consumer负责消费哪些topic的哪些partition。一旦完成分配,leader会将这个方案封装进SyncGroup请求中发给coordinator,非leader也会发SyncGroup请求,只是内容为空。coordinator接收到分配方案以后会把方案塞进SyncGroup的response中发给各个consumer。这样组内的全部成员就都知道本身应该消费哪些分区了。
仍是拿几张图来讲明吧,首先是加入组的过程:
值得注意的是, 在coordinator收集到全部成员请求前,它会把已收到请求放入一个叫purgatory(炼狱)的地方。记得国内有篇文章以此来证实kafka开发人员都是颇有文艺范的,写得也是比较有趣,有兴趣能够去搜搜。
而后是分发分配方案的过程,即SyncGroup请求:
注意!! consumer group的分区分配方案是在客户端执行的!Kafka将这个权利下放给客户端主要是由于这样作能够有更好的灵活性。好比这种机制下我能够实现相似于Hadoop那样的机架感知(rack-aware)分配方案,即为consumer挑选同一个机架下的分区数据,减小网络传输的开销。Kafka默认为你提供了两种分配策略:range和round-robin。因为这不是本文的重点,这里就再也不详细展开了,你只须要记住你能够覆盖consumer的参数:partition.assignment.strategy来实现本身分配策略就行了。
4.10 consumer group状态机
和不少kafka组件同样,group也作了个状态机来代表组状态的流转。coordinator根据这个状态机会对consumer group作不一样的处理,以下图所示(彻底是根据代码注释手动画的,多见谅吧)
简单说明下图中的各个状态:
至于各个状态之间的流程条件以及action,这里就不具体展开了。
3、rebalance场景剖析
上面详细阐述了consumer group是如何执行rebalance的,可能依然有些云里雾里。这部分对其中的三个重要的场景作详尽的时序展开,进一步加深对于consumer group内部原理的理解。因为图比较直观,全部的描述都将以图的方式给出,不作过多的文字化描述了。
1 新成员加入组(member join)
2 组成员崩溃(member failure)
前面说过了,组成员崩溃和组成员主动离开是两个不一样的场景。由于在崩溃时成员并不会主动地告知coordinator此事,coordinator有可能须要一个完整的session.timeout周期才能检测到这种崩溃,这必然会形成consumer的滞后。能够说离开组是主动地发起rebalance;而崩溃则是被动地发起rebalance。okay,直接上图:
3 组成员主动离组(member leave group)
4 提交位移(member commit offset)
总结一下,本文着重讨论了一下新版本的consumer group的内部设计原理,特别是consumer group与coordinator之间的交互过程,但愿对各位有所帮助。