Apche Kafka 的生与死 – failover 机制详解

转自:http://www.cnblogs.com/fxjwind/p/4972244.htmlhtml

Kafka 做为 high throughput 的消息中间件,以其性能,简单和稳定性,成为当前实时流处理框架中的主流的基础组件。node

固然在使用 Kafka 中也碰到很多问题,尤为是 failover 的问题,经常给你们带来很多困扰和麻烦。 
因此在梳理完 kafka 源码的基础上,尽可能用通俗易懂的方式,把 Kafka 发生 failover 时的机制解释清楚,让你们在使用和运维中,作到心中有数。算法

若是对 kafka 不了解的,能够先参考https://kafka.apache.org/08/design.html,有个大体的概念。apache

 

0 背景

这里讨论 kafka 的 failover 的前提是在0.8版本后, kafka 提供了 replica 机制。 
对于0.7版本不存在 failover 的说法,由于任意一个 broker dead 都会致使上面的数据不可读,从而致使服务中断。网络

下面简单的介绍一下 0.8中加入的 replica 机制和相应的组件,session

Replica 机制

基本思想大同小异,以下图 (Ref.2):并发

image_thumb3

 

图中有4个 kafka brokers,而且Topic1有四个 partition(用蓝色表示)分布在4个 brokers 上,为 leader replica; 
且每一个 partition 都有两个 follower replicas(用橘色表示),分布在和 leader replica 不一样的 brokers。 
这个分配算法很简单,有兴趣的能够参考kafka的design。框架

 

Replica 组件

为了支持replica机制,主要增长的两个组件是,Replica Manager和Controller, 以下图:less

image_thumb1

 

Replica Manager

每一个 broker server 都会建立一个 Replica Manager,全部的数据的读写都须要通过它 , 
0.7版本,kafka 会直接从 LogManager 中读数据,但在增长 replica 机制后,只有 leader replica 能够响应数据的读写请求 。 
因此,Replica Manager 须要管理全部 partition 的 replica 状态,并响应读写请求,以及其余和 replica 相关的操做。运维

 

Controller

你们能够看到,每一个 partition 都有一个 leader replica,和若干的 follower replica,那么谁来决定谁是leader? 
你说有 zookeeper,但用 zk 为每一个 partition 作 elect,效率过低,并且 zk 会不堪重负; 
因此如今的通用作法是,只用 zk 选一个 master 节点,而后由这个 master 节点来作其余的全部仲裁工做。 
kafka 的作法就是在 brokers 中选出一个做为 controller,来作为 master 节点,从而仲裁全部的 partition 的 leader 选举。

 

下面咱们会从以下几个方面来解释 failover 机制, 
先从 client 的角度看看当 kafka 发生 failover 时,数据一致性问题。 
而后从 Kafka 的各个重要组件,Zookeeper,Broker, Controller 发生 failover 会形成什么样的影响? 
最后给出一些判断 kafka 状态的 tips。

 

1 从 Client 的角度

从 producer 的角度, 发的数据是否会丢?

除了要打开 replica 机制,还取决于 produce 的 request.required.acks 的设置,

  • acks = 0,发就发了,不须要 ack,不管成功与否 ;
  • acks = 1,当写 leader replica 成功后就返回,其余的 replica 都是经过fetcher去异步更新的,固然这样会有数据丢失的风险,若是leader的数据没有来得及同步,leader挂了,那么会丢失数据;
  • acks = –1, 要等待全部的replicas都成功后,才能返回;这种纯同步写的延迟会比较高。

因此,通常的状况下,thoughput 优先,设成1,在极端状况下,是有可能丢失数据的; 
若是能够接受较长的写延迟,能够选择将 acks 设为 –1。

 

从 consumer 的角度, 是否会读到不一致的数据?

首先不管是 high-level 或 low-level consumer,咱们要知道他是怎么从 kafka 读数据的?

image_thumb12

kafka 的 log patition 存在文件中,并以 offset 做为索引,因此 consumer 须要对于每一个 partition 记录上次读到的 offset (high-level和low-level的区别在因而 kafka 帮你记,仍是你本身记);

因此若是 consumer dead,重启后只须要继续从上次的 offset 开始读,那就不会有不一致的问题。

但若是是 Kafka broker dead,并发生 partition leader 切换,如何保证在新的 leader 上这个 offset 仍然有效?  
Kafka 用一种机制,即 committed offset,来保证这种一致性,以下图(Ref.2)

image_thumb13

log 除了有 log end offset 来表示 log 的末端,还有一个 committed offset, 表示有效的 offset; 
committed offset 只有在全部 replica 都同步完该 offset 后,才会被置为该offset; 
因此图中 committed 置为2, 由于 broker3 上的 replica 尚未完成 offset 3 的同步; 
因此这时,offset 3 的 message 对 consumer 是不可见的,consumer最多只能读到 offset 2。 
若是此时,leader dead,不管哪一个 follower 从新选举成 leader,都不会影响数据的一致性,由于consumer可见的offset最多为2,而这个offset在全部的replica上都是一致的。

因此在通常正常状况下,当 kafka 发生 failover 的时候,consumer 是不会读到不一致数据的。特例的状况就是,当前 leader 是惟一有效的 replica,其余replica都处在彻底不一样步状态,这样发生 leader 切换,必定是会丢数据的,并会发生 offset 不一致。

 

2 Zookeeper Failover

Kafka 首先对于 zookeeper 是强依赖,因此 zookeeper 发生异常时,会对数据形成如何的影响?

Zookeeper Dead

若是 zookeeper dead,broker 是没法启动的,报以下的异常:

image_thumb16

这种异常,有多是 zookeeper dead,也有多是网络不通,总之就是连不上 zookeeper。 
这种 case,kafka彻底不工做,直到能够连上 zookeeper 为止。

 

Zookeeper Hang

其实上面这种状况比较简单,比较麻烦的是 zookeeper hang,能够说 kafka 的80%以上问题都是因为这个缘由 
zookeeper hang 的缘由有不少,主要是 zk 负载太重,zk 所在主机 cpu,memeory 或网络资源不够等

zookeeper hang 带来的主要问题就是 session timeout,这样会触发以下的问题,

a. Controller Fail,Controller 发生从新选举和切换,具体过程参考下文。

b. Broker Fail,致使partition的leader发生切换或partition offline,具体过程参考下文。

c. Broker 被 hang 住 。 
这是一种比较特殊的 case,出现时在 server.log 会出现以下的log,

server.log: 
“INFO I wrote this conflicted ephemeral node [{"jmx_port":9999,"timestamp":"1444709  63049","host":"10.151.4.136","version":1,"port":9092}] at /brokers/ids/1 a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$)”

这个问题自己是因为 zookeeper 的一个 bug,参考:https://issues.apache.org/jira/browse/ZOOKEEPER-1740

问题在于“The current behavior of zookeeper for ephemeral nodes is that session expiration and ephemeral node deletion is not an atomic operation.” 
即 zk 的 session 过时和 ephemeral node 删除并非一个原子操做; 
出现的case以下:

  • 在极端case下,zk 触发了 session timeout,但还没来得及完成 /brokers/ids/1 节点的删除,就被 hang 住了,好比是去作很耗时的 fsync 操做 。
  • 可是 broker 1 收到 session timeout 事件后,会尝试从新去 zk 上建立 /brokers/ids/1 节点,可这时旧的节点仍然存在,因此会获得 NodeExists,其实这个是不合理的,由于既然 session timeout,这个节点就应该不存在。
  • 一般的作法,既然已经存在,我就无论了,该干啥干啥去;问题是一会 zk 从 fsync hang 中恢复了,他会记得还有一个节点没有删除,这时会去把 /brokers/ids/1 节点删除。
  • 结果就是对于client,虽然没有再次收到 session 过时的事件,可是 /brokers/ids/1 节点却不存在了。

因此这里作的处理是,在前面发现 NodeExists 时,while true 等待,一直等到 zk 从 hang 中恢复删除该节点,而后建立新节点成功,才算完; 
这样作的结果是这个broker也会被一直卡在这儿,等待该节点被成功建立。

 

3 Broker Failover

Broker 的 Failover,能够分为两个过程,一个是 broker failure, 一个是 broker startup。

新加 broker

在谈failover以前,咱们先看一个更简单的过程,就是新加一个全新的 broker: 
首先明确,新加的 broker 对现存全部的 topic 和 partition,不会有任何影响; 
由于一个 topic 的 partition 的全部 replica 的 assignment 状况,在建立时就决定了,并不会自动发生变化,除非你手动的去作 reassignment。 
因此新加一个 broker,所须要作的只是你们同步一下元数据,你们都知道来了一个新的 broker,当你建立新的 topic 或 partition 的时候,它会被用上。

 

Broker Failure

首先明确,这里的 broker failure,并不必定是 broker server 真正的 dead了, 只是指该 broker 所对应的 zk ephemeral node ,好比/brokers/ids/1,发生 session timeout; 
固然发生这个的缘由,除了server dead,还有不少,好比网络不通;可是咱们不关心,只要出现 sessioin timeout,咱们就认为这个 broker 不工做了; 
会出现以下的log,

controller.log: 
“INFO [BrokerChangeListener on Controller 1]: Newly added brokers: 3, deleted brokers: 4, all live brokers: 3,2,1 (kafka.controller.ReplicaStateMachine$BrokerChangeListener)” 
“INFO [Controller 1]: Broker failure callback for 4 (kafka.controller.KafkaController)”

当一个 broker failure 会影响什么,其实对于多 replicas 场景,通常对最终客户没啥影响。 
只会影响哪些 leader replica 在该 broker 的 partitions; 须要从新作 leader election,若是没法选出一个新的 leader,会致使 partition offline。 
由于若是只是 follow replica failure,不会影响 partition 的状态,仍是能够服务的,只是可用 replica 少了一个;须要注意的是,kafka 是不会自动补齐失败的replica的,即坏一个少一个; 
可是对于 leader replica failure,就须要从新再 elect leader,前面已经讨论过,新选取出的 leader 是能够保证 offset 一致性的;

Note: 其实这里的一致性是有前提的,即除了 fail 的 leader,在 ISR(in-sync replicas) 里面还存在其余的 replica;顾名思义,ISR,就是能 catch up with leader 的 replica。 
虽然 partition 在建立的时候,会分配一个 AR(assigned replicas),可是在运行的过程当中,可能会有一些 replica 因为各类缘由没法跟上 leader,这样的 replica 会被从 ISR 中去除。 
因此 ISR <= AR; 
若是,ISR 中 没有其余的 replica,而且容许 unclean election,那么能够从 AR 中选取一个 leader,但这样必定是丢数据的,没法保证 offset 的一致性。

 

 

Broker Startup

这里的 startup,就是指 failover 中的 startup,会出现以下的log,

controller.log: 
“INFO [BrokerChangeListener on Controller 1]: Newly added brokers: 3, deleted brokers: 4, all live brokers: 3,2,1 (kafka.controller.ReplicaStateMachine$BrokerChangeListener)” 
“INFO [Controller 1]: New broker startup callback for 
3 (kafka.controller.KafkaController)”

过程也不复杂,先将该 broker 上的全部的 replica 设为 online,而后触发 offline partition 或 new partition 的 state 转变为 online; 
因此 broker startup,只会影响 offline partition 或 new partition,让他们有可能成为 online。 
那么对于普通的已经 online partition,影响只是多一个可用的 replica,那仍是在它完成catch up,被加入 ISR 后的事。

Note: Partition 的 leader 在 broker failover 后,不会立刻自动切换回来,这样会产生的问题是,broker间负载不均衡,由于全部的读写都须要经过 leader。 
为了解决这个问题,在server的配置中有个配置,auto.leader.rebalance.enable,将其设为true; 
这样 Controller 会启动一个 scheduler 线程,按期去为每一个 broker 作 rebalance,即发现若是该 broker 上的 imbalance ratio 达到必定比例,就会将其中的某些 partition 的 leader,进行从新 elect 到原先的 broker 上。

 

4 Controller Failover

前面说明过,某个 broker server 会被选出做为 Controller,这个选举的过程就是依赖于 zookeeper 的 ephemeral node,谁能够先在"/controller"目录建立节点,谁就是 controller; 
因此反之,咱们也是 watch 这个目录来判断 Controller 是否发生 failover 或 变化。Controller 发生 failover 时,会出现以下 log:

controller.log: 
“INFO [SessionExpirationListener on 1], ZK expired; shut down all controller components and try to re-elect (kafka.controller.KafkaController$SessionExpirationListener)”

Controller 主要是做为 master 来仲裁 partition 的 leader 的,并维护 partition 和 replicas 的状态机,以及相应的 zk 的 watcher 注册;

Controller 的 failover 过程以下:

  • 试图去在“/controller” 目录抢占建立 ephemeral node;
  • 若是已经有其余的 broker 先建立成功,那么说明新的 controller 已经诞生,更新当前的元数据便可;
  • 若是本身建立成功,说明我已经成为新的 controller,下面就要开始作初始化工做,
  • 初始化主要就是建立和初始化 partition 和 replicas 的状态机,并对 partitions 和 brokers 的目录的变化设置 watcher。

能够看到,单纯 Controller 发生 failover,是不会影响正常数据读写的,只是 partition 的 leader 没法被从新选举,若是此时有 partition 的 leader fail,会致使 partition offline; 
可是 Controller 的 dead,每每是伴随着 broker 的 dead,因此在 Controller 发生 failover 的过程当中,每每会出现 partition offline, 致使数据暂时不可用。

 

5 Tips

Kafka 提供一些工具来方便的查看信息,参考:Kafka Tools

a, 验证topic 是否work?

最简单的方式,就是用 producer 和 consumer console 来测试

Producer console,以下能够往 localhost 的 topic test,插入两条 message,

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test 
This is a message
This is another message

Consumer console,以下就能够把刚写入的 message 读出,

bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning

若是整个过程没有报错,ok,说明你的topic是能够工做的

 

b, 再看看topic是否健康?

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test

这样会打印出 topic test 的 detail 信息,如图,

image_thumb2

从这个图能够说明几个问题:

首先,topic 有几个 partitions,而且 replicas factor 是多少,即有几个 replica? 
图中分别有32个 partitions,而且每一个 partition 有两个 replica。

再者,每一个 partition 的 replicas 都被分配到哪些 brokers 上,而且该 partition 的 leader 是谁? 
好比,图中的 partition0,replicas 被分配到 brokers 4和1上面,其中 leader replica 在 broker 1 上。

最后,是否健康? 
从如下几个方面依次代表健康程度,

  • Isr 为空,说明这个 partition 已经 offline 没法提供服务了,这种 case 在咱们的图中没有出现;
  • Isr 有数据,可是 Isr < Replicas,这种状况下对于用户是没有感知的,可是说明有部分 replicas 已经出问题了,至少是暂时没法和 leader 同步;好比,图中的 partition0,Isr 只有1,说明 replica 4 已经 offline
  • Isr = Replicas,可是 leader 不是 Replicas 中的第一个 replica,这个说明 leader 是发生太重新选取的,这样可能会致使 brokers 负载不均衡;好比,图中的 partition9,leader是2,而不是3,说明虽然当前它的全部 replica 都是正常的,但以前发生太重新选举。

 

c,最后就是看kafka的日志,kafka/logs

主要是看 controller.log 和 server.log,分别记录 controller 和 broker server 的日志。 
而后根据前面我给的每种异常的日志,你能够看出来究竟是出现什么问题。

 

Reference

1. https://kafka.apache.org/08/design.html

2. Neha NarkhedeHands-free Kafka Replication: A lesson in operational simplicity

3. Kafka Tools

相关文章
相关标签/搜索