Kafka的流行归功于它设计和操做简单、存储系统高效、充分利用磁盘顺序读写等特性、很是适合在线日志收集等高吞吐场景。html
Kafka特性之一是它的复制协议。复制协议是保障kafka高可靠性的关键。对于单个集群中每一个Broker不一样工做负载状况下,如何自动调优Kafka副本的工做方式是比较有挑战的。它的挑战之一是要知道如何避免follower进入和退出同步副本列表(即ISR)。从用户的角度来看,若是生产者发送一大批海量消息,可能会引发Kafka Broker不少警告。这些警报代表一些topics处于“under replicated”状态,这些副本处于同步失败或失效状态,更意味着数据没有被复制到足够数量Broker从而增长数据丢失的几率。所以Kafka集群中处于“under replicated”中Partition数要密切监控。这个警告应该来自于Broker失效,减慢或暂停等状态而不是生产者写不一样大小消息引发的。算法
Kafka中主题的每一个Partition有一个预写式日志文件,每一个Partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到Partition中,Partition中的每一个消息都有一个连续的序列号叫作offset, 肯定它在分区日志中惟一的位置。网络
Kafka每一个topic的partition有N个副本,其中N是topic的复制因子。Kafka经过多副本机制实现故障自动转移,当Kafka集群中一个Broker失效状况下仍然保证服务可用。在Kafka中发生复制时确保partition的预写式日志有序地写到其余节点上。N个replicas中。其中一个replica为leader,其余都为follower,leader处理partition的全部读写请求,与此同时,follower会被动按期地去复制leader上的数据。异步
以下图所示,Kafka集群中有4个broker, 某topic有3个partition,且复制因子即副本个数也为3:post
Kafka提供了数据复制算法保证,若是leader发生故障或挂掉,一个新leader被选举并被接受客户端的消息成功写入。Kafka确保从同步副本列表中选举一个副本为leader,或者说follower追赶leader数据。leader负责维护和跟踪ISR(In-Sync Replicas的缩写,表示副本同步队列,具体可参考下节)中全部follower滞后的状态。当producer发送一条消息到broker后,leader写入消息并复制到全部follower。消息提交以后才被成功复制到全部的同步副本。消息复制延迟受最慢的follower限制,重要的是快速检测慢副本,若是follower“落后”太多或者失效,leader将会把它从ISR中删除。fetch
副本同步队列(ISR)
所谓同步,必须知足以下两个条件:ui
默认状况下Kafka对应的topic的replica数量为1,即每一个partition都有一个惟一的leader,为了确保消息的可靠性,一般应用中将其值(由broker的参数offsets.topic.replication.factor指定)大小设置为大于1,好比3。 全部的副本(replicas)统称为Assigned Replicas,即AR。ISR是AR中的一个子集,由leader维护ISR列表,follower从leader同步数据有一些延迟。任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR。线程
上一节中的HW俗称高水位,是HighWatermark的缩写,取一个partition对应的ISR中最小的LEO做为HW,consumer最多只能消费到HW所在的位置。另外每一个replica都有HW,leader和follower各自负责更新本身的HW的状态。对于leader新写入的消息,consumer不能马上消费,leader会等待该消息被全部ISR中的replicas同步后更新HW,此时消息才能被consumer消费。这样就保证了若是leader所在的broker失效,该消息仍然能够重新选举的leader中获取。对于来自内部broKer的读取请求,没有HW的限制。
下图详细的说明了当producer生产消息至broker后,ISR以及HW和LEO的流转过程:设计
因而可知,Kafka的复制机制既不是彻底的同步复制,也不是单纯的异步复制。事实上,同步复制要求全部能工做的follower都复制完,这条消息才会被commit,这种复制方式极大的影响了吞吐率。而异步复制方式下,follower异步的从leader复制数据,数据只要被leader写入log就被认为已经commit,这种状况下若是follower都尚未复制完,落后于leader时,忽然leader宕机,则会丢失数据。而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率。3d
副本不一样步的异常状况
broker 分配的任何一个 partition 都是以 Replica 对象实例的形式存在,而 Replica 在 Kafka 上是有两个角色: leader 和 follower,只要这个 Replica 是 follower,它便会向 leader 进行数据同步。
反映在 ReplicaManager 上就是若是 Broker 的本地副本被选举为 follower,那么它将会启动副本同步线程,其具体实现以下所示:
简单来讲,makeFollowers() 的处理过程以下:
关于第6步,并不必定会为每个 partition 都启动一个 fetcher 线程,对于一个目的 broker,只会启动 num.replica.fetchers
个线程,具体这个 topic-partition 会分配到哪一个 fetcher 线程上,是根据 topic 名和 partition id 进行计算获得,实现所示:
如上所示,在 ReplicaManager 调用 makeFollowers() 启动 replica fetcher 线程后,它其实是经过 ReplicaFetcherManager 实例进行相关 topic-partition 同步线程的启动和关闭,其启动过程分为下面两步:
addFetcherForPartitions()
的具体实现以下所示:
这个方法实际上是作了下面这几件事:
ReplicaFetcherManager 建立 replica Fetcher 线程的实现以下:
replica fetcher 线程在启动以后就开始进行正常数据同步流程了,这个过程都是在 ReplicaFetcherThread 线程中实现的。
ReplicaFetcherThread 的 doWork()
方法是一直在这个线程中的 run()
中调用的,实现方法以下:
在 doWork() 方法中主要作了两件事:
processFetchRequest()
这个方法的做用是发送 Fetch 请求,并对返回的结果进行处理,最终写入到本地副本的 Log 实例中,其具体实现:
其处理过程简单总结一下:
fetch()
方法做用是发送 Fetch 请求,并返回相应的结果,其具体的实现,以下:
processPartitionData
这个方法的做用是,处理 Fetch 请求的具体数据内容,简单来讲就是:检查一下数据大小是否超过限制、将数据追加到本地副本的日志文件中、更新本地副本的 hw 值。
在副本同步的过程当中,会遇到哪些异常状况呢?
你们必定会想到关于 offset 的问题,在 Kafka 中,关于 offset 的处理,不管是 producer 端、consumer 端仍是其余地方,offset 彷佛都是一个如影随行的问题。在副本同步时,关于 offset,会遇到什么问题呢?下面举两个异常的场景:
以上两种状况都是 offset OutOfRange 的状况,只不过:一是 Fetch Offset 超过了 leader 的 LEO,二是 Fetch Offset 小于 leader 最小的 offset
在介绍 Kafka 解决方案以前,咱们先来本身思考一下这两种状况应该怎么处理?
上面是咱们比较容易想出的解决方案,而在 Kafka 中,其解决方案也很相似,不过遇到状况比上面咱们列出的两种状况多了一些复杂,其解决方案以下:
针对第一种状况,在 Kafka 中,实际上还会发生这样一种状况,1 在收到 OutOfRange 错误时,这时去 leader 上获取的 LEO 值与最小的 offset 值,这时候却发现 leader 的 LEO 已经从 800 变成了 1100(这个 topic-partition 的数据量增加得比较快),再按照上面的解决方案就不太合理,Kafka 这边的解决方案是:遇到这种状况,进行重试就能够了,下次同步时就会正常了,可是依然会有上面说的那个问题。
replica fetcher 线程关闭的条件,在三种状况下会关闭对这个 topic-partition 的拉取操做:
这里直接说线程关闭,其实不是很准确,由于每一个 replica fetcher 线程操做的是多个 topic-partition,而在关闭的粒度是 partition 级别,只有这个线程分配的 partition 所有关闭后,这个线程才会真正被关闭。
stopReplica
StopReplica 的请求其实是 Controller 发送过来的,这个在 controller 部分会讲述,它触发的条件有多种,好比:broker 下线、partition replica 迁移等等。
makeLeaders
makeLeaders()
方法的调用是在 broker 上这个 partition 的副本被设置为 leader 时触发的,其实现以下:
调用 ReplicaFetcherManager 的 removeFetcherForPartitions()
删除对这些 topic-partition 的副本同步设置,这里在实现时,会遍历全部的 replica fetcher 线程,都执行 removePartitions()
方法来移除对应的 topic-partition 集合。
removePartitions
这个方法的做用是:ReplicaFetcherThread 将这些 topic-partition 从本身要拉取的 partition 列表中移除。
ReplicaFetcherThread 的关闭
前面介绍那么多,彷佛仍是没有真正去关闭,那么 ReplicaFetcherThread 真正关闭是哪里操做的呢?
实际上 ReplicaManager 每次处理完 LeaderAndIsr 请求后,都会调用 ReplicaFetcherManager 的 shutdownIdleFetcherThreads()
方法,若是 fetcher 线程要拉取的 topic-partition 集合为空,那么就会关闭掉对应的 fetcher 线程。
http://www.javashuo.com/article/p-szajsrgw-kn.html
http://www.javashuo.com/article/p-urlfflqd-hk.html
https://www.infoq.cn/article/depth-interpretation-of-kafka-data-reliability