Kafka在0.8之前的版本中,并不提供High Availablity机制,一旦一个或多个Broker宕机,则宕机期间其上全部Partition都没法继续提供服务。若该Broker永远不能再恢 复,亦或磁盘故障,则其上数据将丢失。而Kafka的设计目标之一便是提供数据持久化,同时对于分布式系统来讲,尤为当集群规模上升到必定程度后,一台或 者多台机器宕机的可能性大大提升,对Failover要求很是高。所以,Kafka从0.8开始提供High Availability机制。本文从Data Replication和Leader Election两方面介绍了Kafka的HA机制。node
在Kafka在0.8之前的版本中,是没有Replication的,一旦某一个Broker宕机,则其上全部的Partition数据都不可被消 费,这与Kafka数据持久性及Delivery Guarantee的设计目标相悖。同时Producer都不能再将数据存于这些Partition中。web
若是Producer使用同步模式则Producer会在尝试从新发送message.send.max.retries
(默认值为3)次后抛出Exception,用户能够选择中止发送后续数据也可选择继续选择发送。而前者会形成数据的阻塞,后者会形成本应发往该Broker的数据的丢失。算法
若是Producer使用异步模式,则Producer会尝试从新发送message.send.max.retries
(默认值为3)次后记录该异常并继续发送后续数据,这会形成数据丢失而且用户只能经过日志发现该问题。同时,Kafka的Producer并未对异步模式提供callback接口。apache
因而可知,在没有Replication的状况下,一旦某机器宕机或者某个Broker中止工做则会形成整个系统的可用性下降。随着集群规模的增长,整个集群中出现该类异常的概率大大增长,所以对于生产系统而言Replication机制的引入很是重要。
网络
注意:本文所述Leader Election主要指Replica之间的Leader Election。session
引入Replication以后,同一个Partition可能会有多个Replica,而这时须要在这些Replication之间选出一个 Leader,Producer和Consumer只与这个Leader交互,其它Replica做为Follower从Leader中复制数据。数据结构
由于须要保证同一个Partition的多个Replica之间的数据一致性(其中一个宕机后其它Replica必需要能继续服务而且即不能形成数 据重复也不能形成数据丢失)。若是没有一个Leader,全部Replica均可同时读/写数据,那就须要保证多个Replica之间互相(N×N条通 路)同步数据,数据的一致性和有序性很是难保证,大大增长了Replication实现的复杂性,同时也增长了出现异常的概率。而引入Leader后,只 有Leader负责数据读写,Follower只向Leader顺序Fetch数据(N条通路),系统更加简单且高效。app
为了更好的作负载均衡,Kafka尽可能将全部的Partition均匀分配到整个集群上。一个典型的部署方式是一个Topic的Partition 数量大于Broker的数量。同时为了提升Kafka的容错能力,也须要将同一个Partition的Replica尽可能分散到不一样的机器。实际上,若是 全部的Replica都在同一个Broker上,那一旦该Broker宕机,该Partition的全部Replica都没法工做,也就达不到HA的效 果。同时,若是某个Broker宕机了,须要保证它上面的负载能够被均匀的分配到其它幸存的全部Broker上。负载均衡
Kafka分配Replica的算法以下:异步
将全部Broker(假设共n个Broker)和待分配的Partition排序
将第i个Partition分配到第(i mod n)个Broker上
将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上
Kafka的Data Replication须要解决以下问题:
怎样Propagate消息
在向Producer发送ACK前须要保证有多少个Replica已经收到该消息
怎样处理某个Replica不工做的状况
怎样处理Failed Replica恢复回来的状况
Producer在发布消息到某个Partition时,先经过ZooKeeper找到该Partition的Leader,而后不管该Topic 的Replication Factor为多少(也即该Partition有多少个Replica),Producer只将该消息发送到该Partition的Leader。 Leader会将该消息写入其本地Log。每一个Follower都从Leader pull数据。这种方式上,Follower存储的数据顺序与Leader保持一致。Follower在收到该消息并写入其Log后,向Leader发送 ACK。一旦Leader收到了ISR中的全部Replica的ACK,该消息就被认为已经commit了,Leader将增长HW而且向 Producer发送ACK。
为了提升性能,每一个Follower在接收到数据后就立马向Leader发送ACK,而非等到数据写入Log中。所以,对于已经commit的消 息,Kafka只能保证它被存于多个Replica的内存中,而不能保证它们被持久化到磁盘中,也就不能彻底保证异常发生后该条消息必定能被 Consumer消费。但考虑到这种场景很是少见,能够认为这种方式在性能和数据持久化上作了一个比较好的平衡。在未来的版本中,Kafka会考虑提供更 高的持久性。
Consumer读消息也是从Leader读取,只有被commit过的消息(offset低于HW的消息)才会暴露给Consumer。
Kafka Replication的数据流以下图所示:
和大部分分布式系统同样,Kafka处理失败须要明肯定义一个Broker是否“活着”。对于Kafka而言,Kafka存活包含两个条件,一是它 必须维护与ZooKeeper的session(这个经过ZooKeeper的Heartbeat机制来实现)。二是Follower必须可以及时将 Leader的消息复制过来,不能“落后太多”。
Leader会跟踪与其保持同步的Replica列表,该列表称为ISR(即in-sync Replica)。若是一个Follower宕机,或者落后太多,Leader将把它从ISR中移除。这里所描述的“落后太多”指Follower复制的 消息落后于Leader后的条数超过预约值(该值可在$KAFKA_HOME/config/server.properties中经过replica.lag.max.messages
配置,其默认值是4000)或者Follower超过必定时间(该值可在$KAFKA_HOME/config/server.properties中经过replica.lag.time.max.ms
来配置,其默认值是10000)未向Leader发送fetch请求。
Kafka的复制机制既不是彻底的同步复制,也不是单纯的异步复制。事实上,彻底同步复制要求全部能工做的Follower都复制完,这条消息才会 被认为commit,这种复制方式极大的影响了吞吐率(高吞吐率是Kafka很是重要的一个特性)。而异步复制方式下,Follower异步的从 Leader复制数据,数据只要被Leader写入log就被认为已经commit,这种状况下若是Follower都复制完都落后于Leader,而如 果Leader忽然宕机,则会丢失数据。而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率。Follower能够批量的从 Leader复制数据,这样极大的提升复制性能(批量写磁盘),极大减小了Follower与Leader的差距。
须要说明的是,Kafka只解决fail/recover,不处理“Byzantine”(“拜占庭”)问题。一条消息只有被ISR里的全部 Follower都从Leader复制过去才会被认为已提交。这样就避免了部分数据被写进了Leader,还没来得及被任何Follower复制就宕机 了,而形成数据丢失(Consumer没法消费这些数据)。而对于Producer而言,它能够选择是否等待消息commit,这能够经过request.required.acks
来设置。这种机制确保了只要ISR有一个或以上的Follower,一条被commit的消息就不会丢失。
上文说明了Kafka是如何作Replication的,另一个很重要的问题是当Leader宕机了,怎样在Follower中选举出新的 Leader。由于Follower可能落后许多或者crash了,因此必须确保选择“最新”的Follower做为新的Leader。一个基本的原则就 是,若是Leader不在了,新的Leader必须拥有原来的Leader commit过的全部消息。这就须要做一个折衷,若是Leader在标明一条消息被commit前等待更多的Follower确认,那在它宕机以后就有更 多的Follower能够做为新的Leader,但这也会形成吞吐率的降低。
一种很是经常使用的选举leader的方式是“Majority Vote”(“少数服从多数”),但Kafka并未采用这种方式。这种模式下,若是咱们有2f+1个Replica(包含Leader和 Follower),那在commit以前必须保证有f+1个Replica复制完消息,为了保证正确选出新的Leader,fail的Replica不 能超过f个。由于在剩下的任意f+1个Replica里,至少有一个Replica包含有最新的全部消息。这种方式有个很大的优点,系统的latency 只取决于最快的几个Broker,而非最慢那个。Majority Vote也有一些劣势,为了保证Leader Election的正常进行,它所能容忍的fail的follower个数比较少。若是要容忍1个follower挂掉,必需要有3个以上的 Replica,若是要容忍2个Follower挂掉,必需要有5个以上的Replica。也就是说,在生产环境下为了保证较高的容错程度,必需要有大量 的Replica,而大量的Replica又会在大数据量下致使性能的急剧降低。这就是这种算法更多用在ZooKeeper这种共享集群配置的系统中而不多在须要存储大量数据的系统中使用的缘由。例如HDFS的HA Feature是基于majority-vote-based journal,可是它的数据存储并无使用这种方式。
实际上,Leader Election算法很是多,好比ZooKeeper的Zab, Raft和Viewstamped Replication。而Kafka所使用的Leader Election算法更像微软的PacificA算法。
Kafka在ZooKeeper中动态维护了一个ISR(in-sync replicas),这个ISR里的全部Replica都跟上了leader,只有ISR里的成员才有被选为Leader的可能。在这种模式下,对于 f+1个Replica,一个Partition能在保证不丢失已经commit的消息的前提下容忍f个Replica的失败。在大多数使用场景中,这种 模式是很是有利的。事实上,为了容忍f个Replica的失败,Majority Vote和ISR在commit前须要等待的Replica数量是同样的,可是ISR须要的总的Replica的个数几乎是Majority Vote的一半。
虽然Majority Vote与ISR相比有不需等待最慢的Broker这一优点,可是Kafka做者认为Kafka能够经过Producer选择是否被commit阻塞来改善这一问题,而且节省下来的Replica和磁盘使得ISR模式仍然值得。
上文提到,在ISR中至少有一个follower时,Kafka能够确保已经commit的数据不丢失,但若是某个Partition的全部Replica都宕机了,就没法保证数据不丢失了。这种状况下有两种可行的方案:
等待ISR中的任一个Replica“活”过来,而且选它做为Leader
选择第一个“活”过来的Replica(不必定是ISR中的)做为Leader
这就须要在可用性和一致性当中做出一个简单的折衷。若是必定要等待ISR中的Replica“活”过来,那不可用的时间就可能会相对较长。并且若是 ISR中的全部Replica都没法“活”过来了,或者数据都丢失了,这个Partition将永远不可用。选择第一个“活”过来的Replica做为 Leader,而这个Replica不是ISR中的Replica,那即便它并不保证已经包含了全部已commit的消息,它也会成为Leader而做为 consumer的数据源(前文有说明,全部读写都由Leader完成)。Kafka0.8.*使用了第二种方式。根据Kafka的文档,在之后的版本 中,Kafka支持用户经过配置选择这两种方式中的一种,从而根据不一样的使用场景选择高可用性仍是强一致性。
最简单最直观的方案是,全部Follower都在ZooKeeper上设置一个Watch,一旦Leader宕机,其对应的ephemeral znode会自动删除,此时全部Follower都尝试建立该节点,而建立成功者(ZooKeeper保证只有一个能建立成功)便是新的Leader,其 它Replica即为Follower。
可是该方法会有3个问题:
split-brain 这是由ZooKeeper的特性引发的,虽然ZooKeeper能保证全部Watch按顺序触发,但并不能保证同一时刻全部Replica“看”到的状态是同样的,这就可能形成不一样Replica的响应不一致
herd effect 若是宕机的那个Broker上的Partition比较多,会形成多个Watch被触发,形成集群内大量的调整
ZooKeeper负载太重 每一个Replica都要为此在ZooKeeper上注册一个Watch,当集群规模增长到几千个Partition时ZooKeeper负载会太重。
Kafka 0.8.*的Leader Election方案解决了上述问题,它在全部broker中选出一个controller,全部Partition的Leader选举都由 controller决定。controller会将Leader的改变直接经过RPC的方式(比ZooKeeper Queue的方式更高效)通知需为为此做为响应的Broker。同时controller也负责增删Topic以及Replica的从新分配。
首先声明本节所示ZooKeeper结构中,实线框表明路径名是固定的,而虚线框表明路径名与业务相关
admin (该目录下znode只有在有相关操做时才会存在,操做结束时会将其删除)
/admin/preferred_replica_election数据结构
{ "fields":[ { "name":"version", "type":"int", "doc":"version id" }, { "name":"partitions", "type":{ "type":"array", "items":{ "fields":[ { "name":"topic", "type":"string", "doc":"topic of the partition for which preferred replica election should be triggered" }, { "name":"partition", "type":"int", "doc":"the partition for which preferred replica election should be triggered" } ], } "doc":"an array of partitions for which preferred replica election should be triggered" } } ] } Example: { "version": 1, "partitions": [ { "topic": "topic1", "partition": 8 }, { "topic": "topic2", "partition": 16 } ] }
/admin/reassign_partitions
用于将一些Partition分配到不一样的broker集合上。 对于每一个待从新分配的Partition,Kafka会在该znode上存储其全部的Replica和相应的Broker id。该znode由管理进程建立而且一旦从新分配成功它将会被自动移除。其数据结构以下:
{ "fields":[ { "name":"version", "type":"int", "doc":"version id" }, { "name":"partitions", "type":{ "type":"array", "items":{ "fields":[ { "name":"topic", "type":"string", "doc":"topic of the partition to be reassigned" }, { "name":"partition", "type":"int", "doc":"the partition to be reassigned" }, { "name":"replicas", "type":"array", "items":"int", "doc":"a list of replica ids" } ], } "doc":"an array of partitions to be reassigned to new replicas" } } ] }
Example: { "version": 1, "partitions": [ { "topic": "topic3", "partition": 1, "replicas": [1, 2, 3] } ] }
/admin/delete_topics数据结构:
Schema: { "fields": [ {"name": "version", "type": "int", "doc": "version id"}, {"name": "topics", "type": { "type": "array", "items": "string", "doc": "an array of topics to be deleted"} } ] } Example: { "version": 1, "topics": ["topic4", "topic5"] }
brokers
broker(即/brokers/ids/[brokerId]
)存储“活着”的broker信息。数据结构以下:
Schema: { "fields": [ {"name": "version", "type": "int", "doc": "version id"}, {"name": "host", "type": "string", "doc": "ip address or host name of the broker"}, {"name": "port", "type": "int", "doc": "port of the broker"}, {"name": "jmx_port", "type": "int", "doc": "port for jmx"} ] } Example: { "jmx_port":-1, "host":"node1", "version":1, "port":9092 }
topic注册信息(/brokers/topics/[topic]
),存储该topic的全部partition的 全部replica所在的broker id,第一个replica即为preferred replica,对一个给定的partition,它在同一个broker上最多只有一个replica,所以broker id可做为replica id。数据结构以下:
Schema: { "fields" : [ {"name": "version", "type": "int", "doc": "version id"}, {"name": "partitions", "type": {"type": "map", "values": {"type": "array", "items": "int", "doc": "a list of replica ids"}, "doc": "a map from partition id to replica list"}, } ] } Example: { "version":1, "partitions": {"12":[6], "8":[2], "4":[6], "11":[5], "9":[3], "5":[7], "10":[4], "6":[8], "1":[3], "0":[2], "2":[4], "7":[1], "3":[5]} }
partition state(/brokers/topics/[topic]/partitions/[partitionId]/state
) 结构以下:
Schema: { "fields": [ {"name": "version", "type": "int", "doc": "version id"}, {"name": "isr", "type": {"type": "array", "items": "int", "doc": "an array of the id of replicas in isr"} }, {"name": "leader", "type": "int", "doc": "id of the leader replica"}, {"name": "controller_epoch", "type": "int", "doc": "epoch of the controller that last updated the leader and isr info"}, {"name": "leader_epoch", "type": "int", "doc": "epoch of the leader"} ] } Example: { "controller_epoch":29, "leader":2, "version":1, "leader_epoch":48, "isr":[2] }
controller /controller -> int (broker id of the controller)
存储当前controller的信息
Schema: { "fields": [ {"name": "version", "type": "int", "doc": "version id"}, {"name": "brokerid", "type": "int", "doc": "broker id of the controller"} ] } Example: { "version":1, "brokerid":8 }
/controller_epoch -> int (epoch)
直接以整数形式存储controller epoch,而非像其它znode同样以JSON字符串形式存储。
Controller在ZooKeeper注册Watch,一旦有Broker宕机(这是用宕机表明任何让系统认为其die的情景,包括但不 限于机器断电,网络不可用,GC致使的Stop The World,进程crash等),其在ZooKeeper对应的znode会自动被删除,ZooKeeper会fire Controller注册的watch,Controller读取最新的幸存的Broker。
Controller决定set_p,该集合包含了宕机的全部Broker上的全部Partition。
对set_p中的每个Partition
3.1 从/brokers/topics/[topic]/partitions/[partition]/state
读取该Partition当前的ISR
3.2 决定该Partition的新Leader。若是当前ISR中有至少一个Replica还幸存,则选择其中一个做为新Leader,新的ISR则包含当前 ISR中全部幸存的Replica。不然选择该Partition中任意一个幸存的Replica做为新的Leader以及ISR(该场景下可能会有潜在 的数据丢失)。若是该Partition的全部Replica都宕机了,则将新的Leader设置为-1。
3.3 将新的Leader,ISR和新的leader_epoch
及controller_epoch
写入/brokers/topics/[topic]/partitions/[partition]/state
。注意,该操做只有其version在3.1至3.3的过程当中无变化时才会执行,不然跳转到3.1
直接经过RPC向set_p相关的Broker发送LeaderAndISRRequest命令。Controller能够在一个RPC操做中发送多个命令从而提升效率。
broker failover顺序图以下所示。