replica副本同步机制

replica副本同步机制

1 前言

 Kafka的流行归功于它设计和操做简单、存储系统高效、充分利用磁盘顺序读写等特性、很是适合在线日志收集等高吞吐场景。html

 Kafka特性之一是它的复制协议。复制协议是保障kafka高可靠性的关键。对于单个集群中每一个Broker不一样工做负载状况下,如何自动调优Kafka副本的工做方式是比较有挑战的。它的挑战之一是要知道如何避免follower进入和退出同步副本列表(即ISR)。从用户的角度来看,若是生产者发送一大批海量消息,可能会引发Kafka Broker不少警告。这些警报代表一些topics处于“under replicated”状态,这些副本处于同步失败或失效状态,更意味着数据没有被复制到足够数量Broker从而增长数据丢失的几率。所以Kafka集群中处于“under replicated”中Partition数要密切监控。这个警告应该来自于Broker失效,减慢或暂停等状态而不是生产者写不一样大小消息引发的。算法

2 Kafka的副本机制

Kafka中主题的每一个Partition有一个预写式日志文件,每一个Partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到Partition中,Partition中的每一个消息都有一个连续的序列号叫作offset, 肯定它在分区日志中惟一的位置。网络

 

 

Kafka每一个topic的partition有N个副本,其中N是topic的复制因子。Kafka经过多副本机制实现故障自动转移,当Kafka集群中一个Broker失效状况下仍然保证服务可用。在Kafka中发生复制时确保partition的预写式日志有序地写到其余节点上。N个replicas中。其中一个replica为leader,其余都为follower,leader处理partition的全部读写请求,与此同时,follower会被动按期地去复制leader上的数据。异步

以下图所示,Kafka集群中有4个broker, 某topic有3个partition,且复制因子即副本个数也为3:post

 

 

Kafka提供了数据复制算法保证,若是leader发生故障或挂掉,一个新leader被选举并被接受客户端的消息成功写入。Kafka确保从同步副本列表中选举一个副本为leader,或者说follower追赶leader数据。leader负责维护和跟踪ISR(In-Sync Replicas的缩写,表示副本同步队列,具体可参考下节)中全部follower滞后的状态。当producer发送一条消息到broker后,leader写入消息并复制到全部follower。消息提交以后才被成功复制到全部的同步副本。消息复制延迟受最慢的follower限制,重要的是快速检测慢副本,若是follower“落后”太多或者失效,leader将会把它从ISR中删除。fetch

副本同步队列(ISR)
所谓同步,必须知足以下两个条件:ui

  • 副本节点必须能与zookeeper保持会话(心跳机制)
  • 副本能复制leader上的全部写操做,而且不能落后太多。(卡住或滞后的副本控制是由 replica.lag.time.max.ms 配置)

默认状况下Kafka对应的topic的replica数量为1,即每一个partition都有一个惟一的leader,为了确保消息的可靠性,一般应用中将其值(由broker的参数offsets.topic.replication.factor指定)大小设置为大于1,好比3。 全部的副本(replicas)统称为Assigned Replicas,即AR。ISR是AR中的一个子集,由leader维护ISR列表,follower从leader同步数据有一些延迟。任意一个超过阈值都会把follower剔除出ISR, 存入OSR(Outof-Sync Replicas)列表,新加入的follower也会先存放在OSR中。AR=ISR+OSR。线程

上一节中的HW俗称高水位,是HighWatermark的缩写,取一个partition对应的ISR中最小的LEO做为HW,consumer最多只能消费到HW所在的位置。另外每一个replica都有HW,leader和follower各自负责更新本身的HW的状态。对于leader新写入的消息,consumer不能马上消费,leader会等待该消息被全部ISR中的replicas同步后更新HW,此时消息才能被consumer消费。这样就保证了若是leader所在的broker失效,该消息仍然能够重新选举的leader中获取。对于来自内部broKer的读取请求,没有HW的限制。
下图详细的说明了当producer生产消息至broker后,ISR以及HW和LEO的流转过程:设计

 

 

因而可知,Kafka的复制机制既不是彻底的同步复制,也不是单纯的异步复制。事实上,同步复制要求全部能工做的follower都复制完,这条消息才会被commit,这种复制方式极大的影响了吞吐率。而异步复制方式下,follower异步的从leader复制数据,数据只要被leader写入log就被认为已经commit,这种状况下若是follower都尚未复制完,落后于leader时,忽然leader宕机,则会丢失数据。而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率。3d

  • Controller来维护:Kafka集群中的其中一个Broker会被选举为Controller,主要负责Partition管理和副本状态管理,也会执行相似于重分配partition之类的管理任务。在符合某些特定条件下,Controller下的LeaderSelector会选举新的leader,ISR和新的leader_epoch及controller_epoch写入Zookeeper的相关节点中。同时发起LeaderAndIsrRequest通知全部的replicas。
  • leader来维护:leader有单独的线程按期检测ISR中follower是否脱离ISR, 若是发现ISR变化,则会将新的ISR的信息返回到Zookeeper的相关节点中。

副本不一样步的异常状况

  • 慢副本:在必定周期时间内follower不能追遇上leader。最多见的缘由之一是I / O瓶颈致使follower追加复制消息速度慢于从leader拉取速度。
  • 卡住副本:在必定周期时间内follower中止从leader拉取请求。follower replica卡住了是因为GC暂停或follower失效或死亡。
  • 新启动副本:当用户给主题增长副本因子时,新的follower不在同步副本列表中,直到他们彻底遇上了leader日志。

 

3 Follower向leader拉取数据的过程

3.1 replica fetcher 线程什么时候启动

broker 分配的任何一个 partition 都是以 Replica 对象实例的形式存在,而 Replica 在 Kafka 上是有两个角色: leader 和 follower,只要这个 Replica 是 follower,它便会向 leader 进行数据同步。

反映在 ReplicaManager 上就是若是 Broker 的本地副本被选举为 follower,那么它将会启动副本同步线程,其具体实现以下所示:

 

 

简单来讲,makeFollowers() 的处理过程以下:

  1. 先从本地记录 leader partition 的集合中将这些 partition 移除,由于这些 partition 已经被选举为了 follower;
  2. 将这些 partition 的本地副本设置为 follower,后面就不会接收关于这个 partition 的 Produce 请求了,若是依然有 client 在向这台 broker 发送数据,那么它将会返回相应的错误;
  3. 先中止关于这些 partition 的副本同步线程(若是本地副本以前是 follower 如今仍是 follower,先关闭的缘由是:这个 partition 的 leader 发生了变化,若是 leader 没有发生变化,那么 makeFollower方法返回的是 False,这个 Partition 就不会被添加到 partitionsToMakeFollower 集合中),这样的话能够保证这些 partition 的本地副本将不会再有新的数据追加;
  4. 对这些 partition 本地副本日志文件进行截断操做并进行 checkpoint 操做;
  5. 完成那些延迟处理的 Produce 和 Fetch 请求;
  6. 若是本地的 broker 没有掉线,那么向这些 partition 新选举出来的 leader 启动副本同步线程。

关于第6步,并不必定会为每个 partition 都启动一个 fetcher 线程,对于一个目的 broker,只会启动 num.replica.fetchers 个线程,具体这个 topic-partition 会分配到哪一个 fetcher 线程上,是根据 topic 名和 partition id 进行计算获得,实现所示:

 

 

3.2 replica fetcher 线程启动

如上所示,在 ReplicaManager 调用 makeFollowers() 启动 replica fetcher 线程后,它其实是经过 ReplicaFetcherManager 实例进行相关 topic-partition 同步线程的启动和关闭,其启动过程分为下面两步:

  1. ReplicaFetcherManager 调用 addFetcherForPartitions() 添加对这些 topic-partition 的数据同步流程;
  2. ReplicaFetcherManager 调用 createFetcherThread() 初始化相应的 ReplicaFetcherThread 线程。

 

addFetcherForPartitions() 的具体实现以下所示:

 

 

这个方法实际上是作了下面这几件事:

  1. 先计算这个 topic-partition 对应的 fetcher id;
  2. 根据 leader 和 fetcher id 获取对应的 replica fetcher 线程,若是没有找到,就调用 createFetcherThread() 建立一个新的 fetcher 线程;
  3. 若是是新启动的 replica fetcher 线程,那么就启动这个线程;
  4. 将 topic-partition 记录到 fetcherThreadMap 中,这个变量记录每一个 replica fetcher 线程要同步的 topic-partition 列表。

ReplicaFetcherManager 建立 replica Fetcher 线程的实现以下:

 

3.3 replica fetcher 线程处理过程

replica fetcher 线程在启动以后就开始进行正常数据同步流程了,这个过程都是在 ReplicaFetcherThread 线程中实现的。

ReplicaFetcherThread 的 doWork() 方法是一直在这个线程中的 run() 中调用的,实现方法以下:

 

 

在 doWork() 方法中主要作了两件事:

  1. 构造相应的 Fetch 请求(buildFetchRequest());
  2. 经过 processFetchRequest() 方法发送 Fetch 请求,并对其结果进行相应的处理。

 

processFetchRequest() 这个方法的做用是发送 Fetch 请求,并对返回的结果进行处理,最终写入到本地副本的 Log 实例中,其具体实现:

 

 

其处理过程简单总结一下:

  1. 经过 fetch() 方法,发送 Fetch 请求,获取相应的 response(若是遇到异常,那么在下次发送 Fetch 请求以前,会 sleep 一段时间再发);
  2. 若是返回的结果 不为空,而且 Fetch 请求的 offset 信息与返回结果的 offset 信息对得上,那么就会调用 processPartitionData() 方法将拉取到的数据追加本地副本的日志文件中,若是返回结果有错误信息,那么就对相应错误进行相应的处理;
  3. 对在 Fetch 过程当中遇到异常或返回错误的 topic-partition,会进行 delay 操做,下次 Fetch 请求的发生至少要间隔 replica.fetch.backoff.ms 时间。

 

fetch() 方法做用是发送 Fetch 请求,并返回相应的结果,其具体的实现,以下:

 

 

processPartitionData

这个方法的做用是,处理 Fetch 请求的具体数据内容,简单来讲就是:检查一下数据大小是否超过限制、将数据追加到本地副本的日志文件中、更新本地副本的 hw 值。

 

 

3.3 副本同步异常状况的处理

在副本同步的过程当中,会遇到哪些异常状况呢?

你们必定会想到关于 offset 的问题,在 Kafka 中,关于 offset 的处理,不管是 producer 端、consumer 端仍是其余地方,offset 彷佛都是一个如影随行的问题。在副本同步时,关于 offset,会遇到什么问题呢?下面举两个异常的场景:

  1. 假如当前本地(id:1)的副本如今是 leader,其 LEO 假设为1000,而另外一个在 isr 中的副本(id:2)其 LEO 为800,此时出现网络抖动,id 为1 的机器掉线后又上线了,可是此时副本的 leader 实际上已经变成了 2,而2的 LEO 为800,这时候1启动副本同步线程去2上拉取数据,但愿从 offset=1000 的地方开始拉取,可是2上最大的 offset 才是800,这种状况该如何处理呢?
  2. 假设一个 replica (id:1)其 LEO 是10,它已经掉线好几天,这个 partition leader 的 offset 范围是 [100, 800],那么 1 重启启动时,它但愿从 offset=10 的地方开始拉取数据时,这时候发生了 OutOfRange,不过跟上面不一样的是这里是小于了 leader offset 的范围,这种状况又该怎么处理?

以上两种状况都是 offset OutOfRange 的状况,只不过:一是 Fetch Offset 超过了 leader 的 LEO,二是 Fetch Offset 小于 leader 最小的 offset

在介绍 Kafka 解决方案以前,咱们先来本身思考一下这两种状况应该怎么处理?

  1. 若是 fetch offset 超过 leader 的 offset,这时候副本应该是回溯到 leader 的 LEO 位置(超过这个值的数据删除),而后再去进行副本同步,固然这种解决方案实际上是没法保证 leader 与 follower 数据的彻底一致,再次发生 leader 切换时,可能会致使数据的可见性不一致,但既然用户容许了脏选举的发生,其实咱们是能够认为用户是能够接收这种状况发生的;
  2. 这种就比较容易处理,首先清空本地的数据,由于本地的数据都已通过期了,而后从 leader 的最小 offset 位置开始拉取数据。

上面是咱们比较容易想出的解决方案,而在 Kafka 中,其解决方案也很相似,不过遇到状况比上面咱们列出的两种状况多了一些复杂,其解决方案以下:

 

 

针对第一种状况,在 Kafka 中,实际上还会发生这样一种状况,1 在收到 OutOfRange 错误时,这时去 leader 上获取的 LEO 值与最小的 offset 值,这时候却发现 leader 的 LEO 已经从 800 变成了 1100(这个 topic-partition 的数据量增加得比较快),再按照上面的解决方案就不太合理,Kafka 这边的解决方案是:遇到这种状况,进行重试就能够了,下次同步时就会正常了,可是依然会有上面说的那个问题。

3.4 replica fetcher 线程的关闭

 replica fetcher 线程关闭的条件,在三种状况下会关闭对这个 topic-partition 的拉取操做:

  1. stopReplica():broker 收到了 controller 发来的 StopReplica 请求,这时会开始关闭对指定 topic-partition 的同步线程;
  2. makeLeaders:这些 partition 的本地副本被选举成了 leader,这时候就会先中止对这些 topic-partition 副本同步线程;
  3. makeFollowers():前面已经介绍过,这里实际上中止副本同步,而后再开启副本同步线程,由于这些 topic-partition 的 leader 可能发生了切换。

这里直接说线程关闭,其实不是很准确,由于每一个 replica fetcher 线程操做的是多个 topic-partition,而在关闭的粒度是 partition 级别,只有这个线程分配的 partition 所有关闭后,这个线程才会真正被关闭。

stopReplica

StopReplica 的请求其实是 Controller 发送过来的,这个在 controller 部分会讲述,它触发的条件有多种,好比:broker 下线、partition replica 迁移等等。

makeLeaders

makeLeaders() 方法的调用是在 broker 上这个 partition 的副本被设置为 leader 时触发的,其实现以下:

 

 

调用 ReplicaFetcherManager 的 removeFetcherForPartitions() 删除对这些 topic-partition 的副本同步设置,这里在实现时,会遍历全部的 replica fetcher 线程,都执行 removePartitions() 方法来移除对应的 topic-partition 集合。

 

 

removePartitions

这个方法的做用是:ReplicaFetcherThread 将这些 topic-partition 从本身要拉取的 partition 列表中移除。

 

 

 

ReplicaFetcherThread 的关闭

前面介绍那么多,彷佛仍是没有真正去关闭,那么 ReplicaFetcherThread 真正关闭是哪里操做的呢?

实际上 ReplicaManager 每次处理完 LeaderAndIsr 请求后,都会调用 ReplicaFetcherManager 的 shutdownIdleFetcherThreads() 方法,若是 fetcher 线程要拉取的 topic-partition 集合为空,那么就会关闭掉对应的 fetcher 线程。

 

 

参考资料:

http://www.javashuo.com/article/p-szajsrgw-kn.html

http://www.javashuo.com/article/p-urlfflqd-hk.html

https://www.infoq.cn/article/depth-interpretation-of-kafka-data-reliability

相关文章
相关标签/搜索