转自:http://www.cnblogs.com/fxjwind/p/4972244.htmlhtml
Kafka 做为 high throughput 的消息中间件,以其性能,简单和稳定性,成为当前实时流处理框架中的主流的基础组件。node
固然在使用 Kafka 中也碰到很多问题,尤为是 failover 的问题,经常给你们带来很多困扰和麻烦。
因此在梳理完 kafka 源码的基础上,尽可能用通俗易懂的方式,把 Kafka 发生 failover 时的机制解释清楚,让你们在使用和运维中,作到心中有数。算法
若是对 kafka 不了解的,能够先参考https://kafka.apache.org/08/design.html,有个大体的概念。apache
这里讨论 kafka 的 failover 的前提是在0.8版本后, kafka 提供了 replica 机制。
对于0.7版本不存在 failover 的说法,由于任意一个 broker dead 都会致使上面的数据不可读,从而致使服务中断。网络
下面简单的介绍一下 0.8中加入的 replica 机制和相应的组件,session
基本思想大同小异,以下图 (Ref.2):并发
图中有4个 kafka brokers,而且Topic1有四个 partition(用蓝色表示)分布在4个 brokers 上,为 leader replica;
且每一个 partition 都有两个 follower replicas(用橘色表示),分布在和 leader replica 不一样的 brokers。
这个分配算法很简单,有兴趣的能够参考kafka的design。框架
为了支持replica机制,主要增长的两个组件是,Replica Manager和Controller, 以下图:less
每一个 broker server 都会建立一个 Replica Manager,全部的数据的读写都须要通过它 ,
0.7版本,kafka 会直接从 LogManager 中读数据,但在增长 replica 机制后,只有 leader replica 能够响应数据的读写请求 。
因此,Replica Manager 须要管理全部 partition 的 replica 状态,并响应读写请求,以及其余和 replica 相关的操做。运维
你们能够看到,每一个 partition 都有一个 leader replica,和若干的 follower replica,那么谁来决定谁是leader?
你说有 zookeeper,但用 zk 为每一个 partition 作 elect,效率过低,并且 zk 会不堪重负;
因此如今的通用作法是,只用 zk 选一个 master 节点,而后由这个 master 节点来作其余的全部仲裁工做。
kafka 的作法就是在 brokers 中选出一个做为 controller,来作为 master 节点,从而仲裁全部的 partition 的 leader 选举。
下面咱们会从以下几个方面来解释 failover 机制,
先从 client 的角度看看当 kafka 发生 failover 时,数据一致性问题。
而后从 Kafka 的各个重要组件,Zookeeper,Broker, Controller 发生 failover 会形成什么样的影响?
最后给出一些判断 kafka 状态的 tips。
除了要打开 replica 机制,还取决于 produce 的 request.required.acks 的设置,
因此,通常的状况下,thoughput 优先,设成1,在极端状况下,是有可能丢失数据的;
若是能够接受较长的写延迟,能够选择将 acks 设为 –1。
首先不管是 high-level 或 low-level consumer,咱们要知道他是怎么从 kafka 读数据的?
kafka 的 log patition 存在文件中,并以 offset 做为索引,因此 consumer 须要对于每一个 partition 记录上次读到的 offset (high-level和low-level的区别在因而 kafka 帮你记,仍是你本身记);
因此若是 consumer dead,重启后只须要继续从上次的 offset 开始读,那就不会有不一致的问题。
但若是是 Kafka broker dead,并发生 partition leader 切换,如何保证在新的 leader 上这个 offset 仍然有效?
Kafka 用一种机制,即 committed offset,来保证这种一致性,以下图(Ref.2)
log 除了有 log end offset 来表示 log 的末端,还有一个 committed offset, 表示有效的 offset;
committed offset 只有在全部 replica 都同步完该 offset 后,才会被置为该offset;
因此图中 committed 置为2, 由于 broker3 上的 replica 尚未完成 offset 3 的同步;
因此这时,offset 3 的 message 对 consumer 是不可见的,consumer最多只能读到 offset 2。
若是此时,leader dead,不管哪一个 follower 从新选举成 leader,都不会影响数据的一致性,由于consumer可见的offset最多为2,而这个offset在全部的replica上都是一致的。
因此在通常正常状况下,当 kafka 发生 failover 的时候,consumer 是不会读到不一致数据的。特例的状况就是,当前 leader 是惟一有效的 replica,其余replica都处在彻底不一样步状态,这样发生 leader 切换,必定是会丢数据的,并会发生 offset 不一致。
Kafka 首先对于 zookeeper 是强依赖,因此 zookeeper 发生异常时,会对数据形成如何的影响?
若是 zookeeper dead,broker 是没法启动的,报以下的异常:
这种异常,有多是 zookeeper dead,也有多是网络不通,总之就是连不上 zookeeper。
这种 case,kafka彻底不工做,直到能够连上 zookeeper 为止。
其实上面这种状况比较简单,比较麻烦的是 zookeeper hang,能够说 kafka 的80%以上问题都是因为这个缘由
zookeeper hang 的缘由有不少,主要是 zk 负载太重,zk 所在主机 cpu,memeory 或网络资源不够等
zookeeper hang 带来的主要问题就是 session timeout,这样会触发以下的问题,
a. Controller Fail,Controller 发生从新选举和切换,具体过程参考下文。
b. Broker Fail,致使partition的leader发生切换或partition offline,具体过程参考下文。
c. Broker 被 hang 住 。
这是一种比较特殊的 case,出现时在 server.log 会出现以下的log,
server.log:
“INFO I wrote this conflicted ephemeral node [{"jmx_port":9999,"timestamp":"1444709 63049","host":"10.151.4.136","version":1,"port":9092}] at /brokers/ids/1 a while back in a different session, hence I will backoff for this node to be deleted by Zookeeper and retry (kafka.utils.ZkUtils$)”
这个问题自己是因为 zookeeper 的一个 bug,参考:https://issues.apache.org/jira/browse/ZOOKEEPER-1740
问题在于“The current behavior of zookeeper for ephemeral nodes is that session expiration and ephemeral node deletion is not an atomic operation.”
即 zk 的 session 过时和 ephemeral node 删除并非一个原子操做;
出现的case以下:
因此这里作的处理是,在前面发现 NodeExists 时,while true 等待,一直等到 zk 从 hang 中恢复删除该节点,而后建立新节点成功,才算完;
这样作的结果是这个broker也会被一直卡在这儿,等待该节点被成功建立。
Broker 的 Failover,能够分为两个过程,一个是 broker failure, 一个是 broker startup。
在谈failover以前,咱们先看一个更简单的过程,就是新加一个全新的 broker:
首先明确,新加的 broker 对现存全部的 topic 和 partition,不会有任何影响;
由于一个 topic 的 partition 的全部 replica 的 assignment 状况,在建立时就决定了,并不会自动发生变化,除非你手动的去作 reassignment。
因此新加一个 broker,所须要作的只是你们同步一下元数据,你们都知道来了一个新的 broker,当你建立新的 topic 或 partition 的时候,它会被用上。
首先明确,这里的 broker failure,并不必定是 broker server 真正的 dead了, 只是指该 broker 所对应的 zk ephemeral node ,好比/brokers/ids/1,发生 session timeout;
固然发生这个的缘由,除了server dead,还有不少,好比网络不通;可是咱们不关心,只要出现 sessioin timeout,咱们就认为这个 broker 不工做了;
会出现以下的log,
controller.log:
“INFO [BrokerChangeListener on Controller 1]: Newly added brokers: 3, deleted brokers: 4, all live brokers: 3,2,1 (kafka.controller.ReplicaStateMachine$BrokerChangeListener)”
“INFO [Controller 1]: Broker failure callback for 4 (kafka.controller.KafkaController)”
当一个 broker failure 会影响什么,其实对于多 replicas 场景,通常对最终客户没啥影响。
只会影响哪些 leader replica 在该 broker 的 partitions; 须要从新作 leader election,若是没法选出一个新的 leader,会致使 partition offline。
由于若是只是 follow replica failure,不会影响 partition 的状态,仍是能够服务的,只是可用 replica 少了一个;须要注意的是,kafka 是不会自动补齐失败的replica的,即坏一个少一个;
可是对于 leader replica failure,就须要从新再 elect leader,前面已经讨论过,新选取出的 leader 是能够保证 offset 一致性的;
Note: 其实这里的一致性是有前提的,即除了 fail 的 leader,在 ISR(in-sync replicas) 里面还存在其余的 replica;顾名思义,ISR,就是能 catch up with leader 的 replica。
虽然 partition 在建立的时候,会分配一个 AR(assigned replicas),可是在运行的过程当中,可能会有一些 replica 因为各类缘由没法跟上 leader,这样的 replica 会被从 ISR 中去除。
因此 ISR <= AR;
若是,ISR 中 没有其余的 replica,而且容许 unclean election,那么能够从 AR 中选取一个 leader,但这样必定是丢数据的,没法保证 offset 的一致性。
这里的 startup,就是指 failover 中的 startup,会出现以下的log,
controller.log:
“INFO [BrokerChangeListener on Controller 1]: Newly added brokers: 3, deleted brokers: 4, all live brokers: 3,2,1 (kafka.controller.ReplicaStateMachine$BrokerChangeListener)”
“INFO [Controller 1]: New broker startup callback for 3 (kafka.controller.KafkaController)”
过程也不复杂,先将该 broker 上的全部的 replica 设为 online,而后触发 offline partition 或 new partition 的 state 转变为 online;
因此 broker startup,只会影响 offline partition 或 new partition,让他们有可能成为 online。
那么对于普通的已经 online partition,影响只是多一个可用的 replica,那仍是在它完成catch up,被加入 ISR 后的事。
Note: Partition 的 leader 在 broker failover 后,不会立刻自动切换回来,这样会产生的问题是,broker间负载不均衡,由于全部的读写都须要经过 leader。
为了解决这个问题,在server的配置中有个配置,auto.leader.rebalance.enable,将其设为true;
这样 Controller 会启动一个 scheduler 线程,按期去为每一个 broker 作 rebalance,即发现若是该 broker 上的 imbalance ratio 达到必定比例,就会将其中的某些 partition 的 leader,进行从新 elect 到原先的 broker 上。
前面说明过,某个 broker server 会被选出做为 Controller,这个选举的过程就是依赖于 zookeeper 的 ephemeral node,谁能够先在"/controller"目录建立节点,谁就是 controller;
因此反之,咱们也是 watch 这个目录来判断 Controller 是否发生 failover 或 变化。Controller 发生 failover 时,会出现以下 log:
controller.log:
“INFO [SessionExpirationListener on 1], ZK expired; shut down all controller components and try to re-elect (kafka.controller.KafkaController$SessionExpirationListener)”
Controller 主要是做为 master 来仲裁 partition 的 leader 的,并维护 partition 和 replicas 的状态机,以及相应的 zk 的 watcher 注册;
Controller 的 failover 过程以下:
能够看到,单纯 Controller 发生 failover,是不会影响正常数据读写的,只是 partition 的 leader 没法被从新选举,若是此时有 partition 的 leader fail,会致使 partition offline;
可是 Controller 的 dead,每每是伴随着 broker 的 dead,因此在 Controller 发生 failover 的过程当中,每每会出现 partition offline, 致使数据暂时不可用。
Kafka 提供一些工具来方便的查看信息,参考:Kafka Tools
a, 验证topic 是否work?
最简单的方式,就是用 producer 和 consumer console 来测试
Producer console,以下能够往 localhost 的 topic test,插入两条 message,
> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test This is a message This is another message
Consumer console,以下就能够把刚写入的 message 读出,
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
若是整个过程没有报错,ok,说明你的topic是能够工做的
b, 再看看topic是否健康?
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
这样会打印出 topic test 的 detail 信息,如图,
从这个图能够说明几个问题:
首先,topic 有几个 partitions,而且 replicas factor 是多少,即有几个 replica?
图中分别有32个 partitions,而且每一个 partition 有两个 replica。
再者,每一个 partition 的 replicas 都被分配到哪些 brokers 上,而且该 partition 的 leader 是谁?
好比,图中的 partition0,replicas 被分配到 brokers 4和1上面,其中 leader replica 在 broker 1 上。
最后,是否健康?
从如下几个方面依次代表健康程度,
c,最后就是看kafka的日志,kafka/logs
主要是看 controller.log 和 server.log,分别记录 controller 和 broker server 的日志。
而后根据前面我给的每种异常的日志,你能够看出来究竟是出现什么问题。
1. https://kafka.apache.org/08/design.html
2. Neha Narkhede,Hands-free Kafka Replication: A lesson in operational simplicity
3. Kafka Tools