Kafka学习之路 (三)Kafka的高可用

1、高可用的由来

1.1 为什么须要Replication

在Kafka在0.8之前的版本中,是没有Replication的,一旦某一个Broker宕机,则其上全部的Partition数据都不可被消费,这与Kafka数据持久性及Delivery Guarantee的设计目标相悖。同时Producer都不能再将数据存于这些Partition中。html

若是Producer使用同步模式则Producer会在尝试从新发送message.send.max.retries(默认值为3)次后抛出Exception,用户能够选择中止发送后续数据也可选择继续选择发送。而前者会形成数据的阻塞,后者会形成本应发往该Broker的数据的丢失。node

若是Producer使用异步模式,则Producer会尝试从新发送message.send.max.retries(默认值为3)次后记录该异常并继续发送后续数据,这会形成数据丢失而且用户只能经过日志发现该问题。同时,Kafka的Producer并未对异步模式提供callback接口。算法

因而可知,在没有Replication的状况下,一旦某机器宕机或者某个Broker中止工做则会形成整个系统的可用性下降。随着集群规模的增长,整个集群中出现该类异常的概率大大增长,所以对于生产系统而言Replication机制的引入很是重要。apache

1.2 Leader Election

引入Replication以后,同一个Partition可能会有多个Replica,而这时须要在这些Replication之间选出一个Leader,Producer和Consumer只与这个Leader交互,其它Replica做为Follower从Leader中复制数据。网络

由于须要保证同一个Partition的多个Replica之间的数据一致性(其中一个宕机后其它Replica必需要能继续服务而且即不能形成数据重复也不能形成数据丢失)。若是没有一个Leader,全部Replica均可同时读/写数据,那就须要保证多个Replica之间互相(N×N条通路)同步数据,数据的一致性和有序性很是难保证,大大增长了Replication实现的复杂性,同时也增长了出现异常的概率。而引入Leader后,只有Leader负责数据读写,Follower只向Leader顺序Fetch数据(N条通路),系统更加简单且高效。session

2、Kafka HA设计解析

2.1 如何将全部Replica均匀分布到整个集群

为了更好的作负载均衡,Kafka尽可能将全部的Partition均匀分配到整个集群上。一个典型的部署方式是一个Topic的Partition数量大于Broker的数量。同时为了提升Kafka的容错能力,也须要将同一个Partition的Replica尽可能分散到不一样的机器。实际上,若是全部的Replica都在同一个Broker上,那一旦该Broker宕机,该Partition的全部Replica都没法工做,也就达不到HA的效果。同时,若是某个Broker宕机了,须要保证它上面的负载能够被均匀的分配到其它幸存的全部Broker上。多线程

Kafka分配Replica的算法以下:app

一、 将全部Broker(假设共n个Broker)和待分配的Partition排序
二、 将第i个Partition分配到第(i mod n)个Broker上
三、 将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上负载均衡

2.2 Data Replication(副本策略)

Kafka的高可靠性的保障来源于其健壮的副本(replication)策略。异步

2.2.1 消息传递同步策略

Producer在发布消息到某个Partition时,先经过ZooKeeper找到该Partition的Leader,而后不管该Topic的Replication Factor为多少,Producer只将该消息发送到该Partition的Leader。Leader会将该消息写入其本地Log。每一个Follower都从Leader pull数据。这种方式上,Follower存储的数据顺序与Leader保持一致。Follower在收到该消息并写入其Log后,向Leader发送ACK。一旦Leader收到了ISR中的全部Replica的ACK,该消息就被认为已经commit了,Leader将增长HW而且向Producer发送ACK。

为了提升性能,每一个Follower在接收到数据后就立马向Leader发送ACK,而非等到数据写入Log中。所以,对于已经commit的消息,Kafka只能保证它被存于多个Replica的内存中,而不能保证它们被持久化到磁盘中,也就不能彻底保证异常发生后该条消息必定能被Consumer消费。

Consumer读消息也是从Leader读取,只有被commit过的消息才会暴露给Consumer。

Kafka Replication的数据流以下图所示:
Kafka学习之路 (三)Kafka的高可用

2.2.2 ACK前须要保证有多少个备份

对于Kafka而言,定义一个Broker是否“活着”包含两个条件:

一是它必须维护与ZooKeeper的session(这个经过ZooKeeper的Heartbeat机制来实现)。
二是Follower必须可以及时将Leader的消息复制过来,不能“落后太大”。

Leader会跟踪与其保持同步的Replica列表,该列表称为ISR(即in-sync Replica)。若是一个Follower宕机,或者落后太多,Leader将把它从ISR中移除。这里所描述的“落后太多”指Follower复制的消息落后于Leader后的条数超过预约值(该值可在$KAFKA_HOME/config/server.properties中经过replica.lag.max.messages配置,其默认值是4000)或者Follower超过必定时间(该值可在$KAFKA_HOME/config/server.properties中经过replica.lag.time.max.ms来配置,其默认值是10000)未向Leader发送fetch请求。

Kafka的复制机制既不是彻底的同步复制,也不是单纯的异步复制。事实上,彻底同步复制要求全部能工做的Follower都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率(高吞吐率是Kafka很是重要的一个特性)。而异步复制方式下,Follower异步的从Leader复制数据,数据只要被Leader写入log就被认为已经commit,这种状况下若是Follower都复制完都落后于Leader,而若是Leader忽然宕机,则会丢失数据。而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率。Follower能够批量的从Leader复制数据,这样极大的提升复制性能(批量写磁盘),极大减小了Follower与Leader的差距。

须要说明的是,Kafka只解决fail/recover,不处理“Byzantine”(“拜占庭”)问题。一条消息只有被ISR里的全部Follower都从Leader复制过去才会被认为已提交。这样就避免了部分数据被写进了Leader,还没来得及被任何Follower复制就宕机了,而形成数据丢失(Consumer没法消费这些数据)。而对于Producer而言,它能够选择是否等待消息commit,这能够经过request.required.acks来设置。这种机制确保了只要ISR有一个或以上的Follower,一条被commit的消息就不会丢失。

2.2.3 Leader Election算法

Leader选举本质上是一个分布式锁,有两种方式实现基于ZooKeeper的分布式锁:

节点名称惟一性:多个客户端建立一个节点,只有成功建立节点的客户端才能得到锁
临时顺序节点:全部客户端在某个目录下建立本身的临时顺序节点,只有序号最小的才得到锁

一种很是经常使用的选举leader的方式是“Majority Vote”(“少数服从多数”),但Kafka并未采用这种方式。这种模式下,若是咱们有2f+1个Replica(包含Leader和Follower),那在commit以前必须保证有f+1个Replica复制完消息,为了保证正确选出新的Leader,fail的Replica不能超过f个。由于在剩下的任意f+1个Replica里,至少有一个Replica包含有最新的全部消息。这种方式有个很大的优点,系统的latency只取决于最快的几个Broker,而非最慢那个。Majority Vote也有一些劣势,为了保证Leader Election的正常进行,它所能容忍的fail的follower个数比较少。若是要容忍1个follower挂掉,必需要有3个以上的Replica,若是要容忍2个Follower挂掉,必需要有5个以上的Replica。也就是说,在生产环境下为了保证较高的容错程度,必需要有大量的Replica,而大量的Replica又会在大数据量下致使性能的急剧降低。这就是这种算法更多用在ZooKeeper这种共享集群配置的系统中而不多在须要存储大量数据的系统中使用的缘由。例如HDFS的HA Feature是基于majority-vote-based journal,可是它的数据存储并无使用这种方式。

Kafka在ZooKeeper中动态维护了一个ISR(in-sync replicas),这个ISR里的全部Replica都跟上了leader,只有ISR里的成员才有被选为Leader的可能。在这种模式下,对于f+1个Replica,一个Partition能在保证不丢失已经commit的消息的前提下容忍f个Replica的失败。在大多数使用场景中,这种模式是很是有利的。事实上,为了容忍f个Replica的失败,Majority Vote和ISR在commit前须要等待的Replica数量是同样的,可是ISR须要的总的Replica的个数几乎是Majority Vote的一半。

虽然Majority Vote与ISR相比有不需等待最慢的Broker这一优点,可是Kafka做者认为Kafka能够经过Producer选择是否被commit阻塞来改善这一问题,而且节省下来的Replica和磁盘使得ISR模式仍然值得。

2.2.4 如何处理全部Replica都不工做

在ISR中至少有一个follower时,Kafka能够确保已经commit的数据不丢失,但若是某个Partition的全部Replica都宕机了,就没法保证数据不丢失了。这种状况下有两种可行的方案:

一、 等待ISR中的任一个Replica“活”过来,而且选它做为Leader
二、 选择第一个“活”过来的Replica(不必定是ISR中的)做为Leader

这就须要在可用性和一致性当中做出一个简单的折衷。若是必定要等待ISR中的Replica“活”过来,那不可用的时间就可能会相对较长。并且若是ISR中的全部Replica都没法“活”过来了,或者数据都丢失了,这个Partition将永远不可用。选择第一个“活”过来的Replica做为Leader,而这个Replica不是ISR中的Replica,那即便它并不保证已经包含了全部已commit的消息,它也会成为Leader而做为consumer的数据源(前文有说明,全部读写都由Leader完成)。Kafka0.8.*使用了第二种方式。根据Kafka的文档,在之后的版本中,Kafka支持用户经过配置选择这两种方式中的一种,从而根据不一样的使用场景选择高可用性仍是强一致性。

2.2.5 选举Leader

最简单最直观的方案是,全部Follower都在ZooKeeper上设置一个Watch,一旦Leader宕机,其对应的ephemeral znode会自动删除,此时全部Follower都尝试建立该节点,而建立成功者(ZooKeeper保证只有一个能建立成功)便是新的Leader,其它Replica即为Follower。

可是该方法会有3个问题:

一、 split-brain 这是由ZooKeeper的特性引发的,虽然ZooKeeper能保证全部Watch按顺序触发,但并不能保证同一时刻全部Replica“看”到的状态是同样的,这就可能形成不一样Replica的响应不一致
二、 herd effect 若是宕机的那个Broker上的Partition比较多,会形成多个Watch被触发,形成集群内大量的调整
三、 ZooKeeper负载太重 每一个Replica都要为此在ZooKeeper上注册一个Watch,当集群规模增长到几千个Partition时ZooKeeper负载会太重。

Kafka 0.8.*的Leader Election方案解决了上述问题,它在全部broker中选出一个controller,全部Partition的Leader选举都由controller决定。controller会将Leader的改变直接经过RPC的方式(比ZooKeeper Queue的方式更高效)通知需为为此做为响应的Broker。同时controller也负责增删Topic以及Replica的从新分配。

3、HA相关ZooKeeper结构

Kafka学习之路 (三)Kafka的高可用

3.1 admin

该目录下znode只有在有相关操做时才会存在,操做结束时会将其删除

/admin/reassign_partitions用于将一些Partition分配到不一样的broker集合上。对于每一个待从新分配的Partition,Kafka会在该znode上存储其全部的Replica和相应的Broker id。该znode由管理进程建立而且一旦从新分配成功它将会被自动移除。

3.2 broker

即/brokers/ids/[brokerId])存储“活着”的broker信息。

topic注册信息(/brokers/topics/[topic]),存储该topic的全部partition的全部replica所在的broker id,第一个replica即为preferred replica,对一个给定的partition,它在同一个broker上最多只有一个replica,所以broker id可做为replica id。

3.3 controller

/controller -> int (broker id of the controller)存储当前controller的信息。
/controller_epoch -> int (epoch)直接以整数形式存储controller epoch,而非像其它znode同样以JSON字符串形式存储。

4、producer发布消息

4.1 写入方式

producer 采用 push 模式将消息发布到 broker,每条消息都被 append 到 patition 中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障 kafka 吞吐率)。

4.2 消息路由

producer 发送消息到 broker 时,会根据分区算法选择将其存储到哪个 partition。其路由机制为:

一、 指定了 patition,则直接使用;
二、 未指定 patition 但指定 key,经过对 key 的 value 进行hash 选出一个 patition
三、 patition 和 key 都未指定,使用轮询选出一个 patition。

4.3 写入流程

producer 写入消息序列图以下所示:
Kafka学习之路 (三)Kafka的高可用
流程说明:

一、 producer 先从 zookeeper 的 "/brokers/.../state" 节点找到该 partition 的 leader
二、 producer 将消息发送给该 leader
三、 leader 将消息写入本地 log
四、 followers 从 leader pull 消息,写入本地 log 后 leader 发送 ACK
五、 leader 收到全部 ISR 中的 replica 的 ACK 后,增长 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK

5、broker保存消息

5.1 存储方式

物理上把 topic 分红一个或多个 patition(对应 server.properties 中的 num.partitions=3 配置),每一个 patition 物理上对应一个文件夹(该文件夹存储该 patition 的全部消息和索引文件),以下:
Kafka学习之路 (三)Kafka的高可用
不管消息是否被消费,kafka 都会保留全部消息。有两种策略能够删除旧数据:

一、 基于时间:log.retention.hours=168
二、 基于大小:log.retention.bytes=1073741824

6、Topic的建立和删除

6.1 建立topic

建立 topic 的序列图以下所示
Kafka学习之路 (三)Kafka的高可用
流程说明:

一、 controller 在 ZooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被建立,则 controller 会经过 watch 获得该 topic 的 partition/replica 分配。
二、 controller从 /brokers/ids 读取当前全部可用的 broker 列表,对于 set_p 中的每个 partition:
2.一、 从分配给该 partition 的全部 replica(称为AR)中任选一个可用的 broker 做为新的 leader,并将AR设置为新的 ISR
2.二、 将新的 leader 和 ISR 写入 /brokers/topics/[topic]/partitions/[partition]/state
三、 controller 经过 RPC 向相关的 broker 发送 LeaderAndISRRequest。

6.2 删除topic

删除 topic 的序列图以下所示:
Kafka学习之路 (三)Kafka的高可用
流程说明:

一、 controller 在 zooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被删除,则 controller 会经过 watch 获得该 topic 的 partition/replica 分配。
二、 若 delete.topic.enable=false,结束;不然 controller 注册在 /admin/delete_topics 上的 watch 被 fire,controller 经过回调向对应的 broker 发送 StopReplicaRequest。

7、broker failover

kafka broker failover 序列图以下所示:
Kafka学习之路 (三)Kafka的高可用
流程说明

一、 controller 在 zookeeper 的 /brokers/ids/[brokerId] 节点注册 Watcher,当 broker 宕机时 zookeeper 会 fire watch
二、 controller 从 /brokers/ids 节点读取可用broker
三、 controller决定set_p,该集合包含宕机 broker 上的全部 partition
四、 对 set_p 中的每个 partition
4.一、 从/brokers/topics/[topic]/partitions/[partition]/state 节点读取 ISR
4.二、 决定新 leader
4.三、 将新 leader、ISR、controller_epoch 和 leader_epoch 等信息写入 state 节点
五、 经过 RPC 向相关 broker 发送 leaderAndISRRequest 命令

8、controller failover

当 controller 宕机时会触发 controller failover。每一个 broker 都会在 zookeeper 的 "/controller" 节点注册 watcher,当 controller 宕机时 zookeeper 中的临时节点消失,全部存活的 broker 收到 fire 的通知,每一个 broker 都尝试建立新的 controller path,只有一个竞选成功并当选为 controller。

当新的 controller 当选时,会触发 KafkaController.onControllerFailover 方法,在该方法中完成以下操做:

一、 读取并增长 Controller Epoch。
二、 在 reassignedPartitions Patch(/admin/reassign_partitions) 上注册 watcher。
三、 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注册 watcher。
四、 经过 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注册 watcher。
五、 若 delete.topic.enable=true(默认值是 false),则 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注册 watcher。
六、 经过 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上注册Watch。
七、 初始化 ControllerContext 对象,设置当前全部 topic,“活”着的 broker 列表,全部 partition 的 leader 及 ISR等。
八、 启动 replicaStateMachine 和 partitionStateMachine。
九、 将 brokerState 状态设置为 RunningAsController。
十、 将每一个 partition 的 Leadership 信息发送给全部“活”着的 broker。
十一、 若 auto.leader.rebalance.enable=true(默认值是true),则启动 partition-rebalance 线程。
十二、 若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,则删除相应的Topic。

9、consumer 消费消息

9.1 consumer API

kafka 提供了两套 consumer API:

一、 The high-level Consumer API
二、 The SimpleConsumer API

其中 high-level consumer API 提供了一个从 kafka 消费数据的高层抽象,而 SimpleConsumer API 则须要开发人员更多地关注细节。

9.1.1 The high-level consumer API

high-level consumer API 提供了 consumer group 的语义,一个消息只能被 group 内的一个 consumer 所消费,且 consumer 消费消息时不关注 offset,最后一个 offset 由 zookeeper 保存。

使用 high-level consumer API 能够是多线程的应用,应当注意:

一、 若是消费线程大于 patition 数量,则有些线程将收不到消息
二、 若是 patition 数量大于线程数,则有些线程多收到多个 patition 的消息
三、 若是一个线程消费多个 patition,则没法保证你收到的消息的顺序,而一个 patition 内的消息是有序的

9.1.2 The SimpleConsumer API

若是你想要对 patition 有更多的控制权,那就应该使用 SimpleConsumer API,好比:

一、 屡次读取一个消息
二、 只消费一个 patition 中的部分消息
三、 使用事务来保证一个消息仅被消费一次

可是使用此 API 时,partition、offset、broker、leader 等对你再也不透明,须要本身去管理。你须要作大量的额外工做:

一、 必须在应用程序中跟踪 offset,从而肯定下一条应该消费哪条消息
二、 应用程序须要经过程序获知每一个 Partition 的 leader 是谁
三、 须要处理 leader 的变动

使用 SimpleConsumer API 的通常流程以下:

一、 查找到一个“活着”的 broker,而且找出每一个 partition 的 leader
二、 找出每一个 partition 的 follower
三、 定义好请求,该请求应该能描述应用程序须要哪些数据
四、 fetch 数据
五、 识别 leader 的变化,并对之做出必要的响应

如下针对 high-level Consumer API 进行说明。

9.2 consumer group

如(一)kafka的简介所说,分配单位是 patition。每一个 consumer 都属于一个 group,一个 partition 只能被同一个 group 内的一个 consumer 所消费(也就保障了一个消息只能被 group 内的一个 consuemr 所消费),可是多个 group 能够同时消费这个 partition。

kafka 的设计目标之一就是同时实现离线处理和实时处理,根据这一特性,可使用 spark/Storm 这些实时处理系统对消息在线处理,同时使用 Hadoop 批处理系统进行离线处理,还能够将数据备份到另外一个数据中心,只须要保证这三者属于不一样的 consumer group。以下图所示:
Kafka学习之路 (三)Kafka的高可用

9.3 消费方式

consumer 采用 pull 模式从 broker 中读取数据。

push 模式很难适应消费速率不一样的消费者,由于消息发送速率是由 broker 决定的。它的目标是尽量以最快速度传递消息,可是这样很容易形成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则能够根据 consumer 的消费能力以适当的速率消费消息。

对于 Kafka 而言,pull 模式更合适,它可简化 broker 的设计,consumer 可自主控制消费消息的速率,同时 consumer 能够本身控制消费方式——便可批量消费也可逐条消费,同时还能选择不一样的提交方式从而实现不一样的传输语义。

9.4 consumer delivery guarantee

若是将 consumer 设置为 autocommit,consumer 一旦读到数据当即自动 commit。若是只讨论这一读取消息的过程,那 Kafka 确保了 Exactly once。

但实际使用中应用程序并不是在 consumer 读取完数据就结束了,而是要进行进一步处理,而数据处理与 commit 的顺序在很大程度上决定了consumer delivery guarantee:

一、 读完消息先 commit 再处理消息。
这种模式下,若是 consumer 在 commit 后还没来得及处理消息就 crash 了,下次从新开始工做后就没法读到刚刚已提交而未处理的消息,这就对应于 At most once
二、 读完消息先处理再 commit。
这种模式下,若是在处理完消息以后 commit 以前 consumer crash 了,下次从新开始工做时还会处理刚刚未 commit 的消息,实际上该消息已经被处理过了。这就对应于 At least once。
三、 若是必定要作到 Exactly once,就须要协调 offset 和实际操做的输出。
精典的作法是引入两阶段提交。若是能让 offset 和操做输入存在同一个地方,会更简洁和通用。这种方式可能更好,由于许多输出系统可能不支持两阶段提交。好比,consumer 拿到数据后可能把数据放到 HDFS,若是把最新的 offset 和数据自己一块儿写到 HDFS,那就能够保证数据的输出和 offset 的更新要么都完成,要么都不完成,间接实现 Exactly once。(目前就 high-level API而言,offset 是存于Zookeeper 中的,没法存于HDFS,而SimpleConsuemr API的 offset 是由本身去维护的,能够将之存于 HDFS 中)

总之,Kafka 默认保证 At least once,而且容许经过设置 producer 异步提交来实现 At most once(见文章《kafka consumer防止数据丢失》)。而 Exactly once 要求与外部存储系统协做,幸运的是 kafka 提供的 offset 能够很是直接很是容易得使用这种方式。

更多关于 kafka 传输语义的信息请参考《Message Delivery Semantics》

9.5 consumer rebalance

当有 consumer 加入或退出、以及 partition 的改变(如 broker 加入或退出)时会触发 rebalance。consumer rebalance算法以下:

一、 将目标 topic 下的全部 partirtion 排序,存于PT
二、 对某 consumer group 下全部 consumer 排序,存于 CG,第 i 个consumer 记为 Ci
三、 N=size(PT)/size(CG),向上取整
四、 解除 Ci 对原来分配的 partition 的消费权(i从0开始)
五、 将第iN到(i+1)N-1个 partition 分配给 Ci

相关文章
相关标签/搜索