~~~这是一篇有点长的文章,但愿不会令你昏昏欲睡~~~算法
本文主要讨论0.11版本以前Kafka的副本备份机制的设计问题以及0.11是如何解决的。简单来讲,0.11以前副本备份机制主要依赖水位(或水印)的概念,而0.11采用了leader epoch来标识备份进度。后面咱们会详细讨论两种机制的差别。不过首先先作一些基本的名词含义解析。缓存
水位或水印(watermark)一词,也可称为高水位(high watermark),一般被用在流式处理领域(好比Apache Flink、Apache Spark等),以表征元素或事件在基于时间层面上的进度。一个比较经典的表述为:流式系统保证在水位t时刻,建立时间(event time) = t'且t' ≤ t的全部事件都已经到达或被观测到。在Kafka中,水位的概念反而与时间无关,而是与位置信息相关。严格来讲,它表示的就是位置信息,即位移(offset)。网上有一些关于Kafka watermark的介绍,本不该再赘述,但鉴于本文想要重点强调的leader epoch与watermark息息相关,故这里再费些篇幅阐述一下watermark。注意:因为Kafka源码中使用的名字是高水位,故本文将始终使用high watermaker或干脆简称为HW。数据结构
Kafka分区下有可能有不少个副本(replica)用于实现冗余,从而进一步实现高可用。副本根据角色的不一样可分为3类:异步
每一个Kafka副本对象都有两个重要的属性:LEO和HW。注意是全部的副本,而不仅是leader副本。fetch
咱们使用下图来形象化地说明二者的关系:spa
上图中,HW值是7,表示位移是0~7的全部消息都已经处于“已备份状态”(committed),而LEO值是15,那么8~14的消息就是还没有彻底备份(fully replicated)——为何没有15?由于刚才说过了,LEO指向的是下一条消息到来时的位移,故上图使用虚线框表示。咱们总说consumer没法消费未提交消息。这句话若是用以上名词来解读的话,应该表述为:consumer没法消费分区下leader副本中位移值大于分区HW的任何消息。这里须要特别注意分区HW就是leader副本的HW值。scala
既然副本分为leader副本和follower副本,而每一个副本又都有HW和LEO,那么它们是怎么被更新的呢?它们更新的机制又有什么区别呢?咱们一一来分析下:设计
1、follower副本什么时候更新LEO?3d
如前所述,follower副本只是被动地向leader副本请求数据,具体表现为follower副本不停地向leader副本所在的broker发送FETCH请求,一旦获取消息后写入本身的日志中进行备份。那么follower副本的LEO是什么时候更新的呢?首先我必须言明,Kafka有两套follower副本LEO(明白这个是搞懂后面内容的关键,所以请多花一点时间来思考):1. 一套LEO保存在follower副本所在broker的副本管理机中;2. 另外一套LEO保存在leader副本所在broker的副本管理机中——换句话说,leader副本机器上保存了全部的follower副本的LEO。日志
为何要保存两套?这是由于Kafka使用前者帮助follower副本更新其HW值;而利用后者帮助leader副本更新其HW使用。下面咱们分别看下它们被更新的时机。
1 follower副本端的follower副本LEO什么时候更新?(原谅我有点拗口~~~~~)
follower副本端的LEO值就是其底层日志的LEO值,也就是说每当新写入一条消息,其LEO值就会被更新(相似于LEO += 1)。当follower发送FETCH请求后,leader将数据返回给follower,此时follower开始向底层log写数据,从而自动地更新LEO值
2 leader副本端的follower副本LEO什么时候更新?
leader副本端的follower副本LEO的更新发生在leader在处理follower FETCH请求时。一旦leader接收到follower发送的FETCH请求,它首先会从本身的log中读取相应的数据,可是在给follower返回数据以前它先去更新follower的LEO(即上面所说的第二套LEO)
2、follower副本什么时候更新HW?
follower更新HW发生在其更新LEO以后,一旦follower向log写完数据,它会尝试更新它本身的HW值。具体算法就是比较当前LEO值与FETCH响应中leader的HW值,取二者的小者做为新的HW值。这告诉咱们一个事实:若是follower的LEO值超过了leader的HW值,那么follower HW值是不会越过leader HW值的。
3、leader副本什么时候更新LEO?
和follower更新LEO道理相同,leader写log时就会自动地更新它本身的LEO值。
4、leader副本什么时候更新HW值?
前面说过了,leader的HW值就是分区HW值,所以什么时候更新这个值是咱们最关心的,由于它直接影响了分区数据对于consumer的可见性 。如下4种状况下leader会尝试去更新分区HW——切记是尝试,有可能由于不知足条件而不作任何更新:
特别注意上面4个条件中的最后两个。它揭示了一个事实——当Kafka broker都正常工做时,分区HW值的更新时机有两个:leader处理PRODUCE请求时和leader处理FETCH请求时。另外,leader是如何更新它的HW值的呢?前面说过了,leader broker上保存了一套follower副本的LEO以及它本身的LEO。当尝试肯定分区HW时,它会选出全部知足条件的副本,比较它们的LEO(固然也包括leader本身的LEO),并选择最小的LEO值做为HW值。这里的知足条件主要是指副本要知足如下两个条件之一:
乍看上去好像这两个条件说得是一回事,毕竟ISR的定义就是第二个条件描述的那样。但某些状况下Kafka的确可能出现副本已经“追上”了leader的进度,但却不在ISR中——好比某个从failure中恢复的副本。若是Kafka只判断第一个条件的话,肯定分区HW值时就不会考虑这些未在ISR中的副本,但这些副本已经具有了“马上进入ISR”的资格,所以就可能出现分区HW值越过ISR中副本LEO的状况——这确定是不容许的,由于分区HW实际上就是ISR中全部副本LEO的最小值。
好了,理论部分我以为说的差很少了,下面举个实际的例子。咱们假设有一个topic,单分区,副本因子是2,即一个leader副本和一个follower副本。咱们看下当producer发送一条消息时,broker端的副本到底会发生什么事情以及分区HW是如何被更新的。
下图是初始状态,咱们稍微解释一下:初始时leader和follower的HW和LEO都是0(严格来讲源代码会初始化LEO为-1,不过这不影响以后的讨论)。leader中的remote LEO指的就是leader端保存的follower LEO,也被初始化成0。此时,producer没有发送任何消息给leader,而follower已经开始不断地给leader发送FETCH请求了,但由于没有数据所以什么都不会发生。值得一提的是,follower发送过来的FETCH请求由于无数据而暂时会被寄存到leader端的purgatory中,待500ms(replica.fetch.wait.max.ms参数)超时后会强制完成。假若在寄存期间producer端发送过来数据,那么会Kafka会自动唤醒该FETCH请求,让leader继续处理之。
虽然purgatory不是本文的重点,但FETCH请求发送和PRODUCE请求处理的时机会影响咱们的讨论。所以后续咱们也将分两种状况来讨论分区HW的更新。
第一种状况:follower发送FETCH请求在leader处理完PRODUCE请求以后
producer给该topic分区发送了一条消息。此时的状态以下图所示:
如图所示,leader接收到PRODUCE请求主要作两件事情:
因此,PRODUCE请求处理完成后leader端的HW值依然是0,而LEO是1,remote LEO是1。假设此时follower发送了FETCH请求(或者说follower早已发送了FETCH请求,只不过在broker的请求队列中排队),那么状态变动以下图所示:
本例中当follower发送FETCH请求时,leader端的处理依次是:
而follower副本接收到FETCH response后依次执行下列操做:
此时,第一轮FETCH RPC结束,咱们会发现虽然leader和follower都已经在log中保存了这条消息,但分区HW值还没有被更新。实际上,它是在第二轮FETCH RPC中被更新的,以下图所示:
上图中,follower发来了第二轮FETCH请求,leader端接收到后仍然会依次执行下列操做:
一样地,follower副本接收到FETCH response后依次执行下列操做:
Okay,producer端发送消息后broker端完整的处理流程就讲完了。此时消息已经成功地被复制到leader和follower的log中且分区HW是1,代表consumer可以消费offset = 0的这条消息。下面咱们来分析下PRODUCE和FETCH请求交互的第二种状况。
第二种状况:FETCH请求保存在purgatory中PRODUCE请求到来
这种状况实际上和第一种状况差很少。前面说过了,当leader没法当即知足FECTH返回要求的时候(好比没有数据),那么该FETCH请求会被暂存到leader端的purgatory中,待时机成熟时会尝试再次处理它。不过Kafka不会无限期地将其缓存着,默认有个超时时间(500ms),一旦超时时间已过,则这个请求会被强制完成。不过咱们要讨论的场景是在寄存期间,producer发送PRODUCE请求从而使之知足了条件从而被唤醒。此时,leader端处理流程以下:
至于唤醒后的FETCH请求的处理与第一种状况彻底一致,故这里不作详细展开了。
以上全部的东西其实就想说明一件事情:Kafka使用HW值来决定副本备份的进度,而HW值的更新一般须要额外一轮FETCH RPC才能完成,故而这种设计是有问题的。它们可能引发的问题包括:
咱们一一分析下:
1、数据丢失
如前所述,使用HW值来肯定备份进度时其值的更新是在下一轮RPC中完成的。如今翻到上面使用两种不一样颜色标记的步骤处思考下, 若是follower副本在蓝色标记的第一步与紫色标记的第二步之间发生崩溃,那么就有可能形成数据的丢失。咱们举个例子来看下。
上图中有两个副本:A和B。开始状态是A是leader。咱们假设producer端min.insync.replicas设置为1,那么当producer发送两条消息给A后,A写入到底层log,此时Kafka会通知producer说这两条消息写入成功。
可是在broker端,leader和follower底层的log虽都写入了2条消息且分区HW已经被更新到2,但follower HW还没有被更新(也就是上面紫色颜色标记的第二步还没有执行)。假若此时副本B所在的broker宕机,那么重启回来后B会自动把LEO调整到以前的HW值,故副本B会作日志截断(log truncation),将offset = 1的那条消息从log中删除,并调整LEO = 1,此时follower副本底层log中就只有一条消息,即offset = 0的消息。
B重启以后须要给A发FETCH请求,但若A所在broker机器在此时宕机,那么Kafka会令B成为新的leader,而当A重启回来后也会执行日志截断,将HW调整回1。这样,位移=1的消息就从两个副本的log中被删除,即永远地丢失了。
这个场景丢失数据的前提是在min.insync.replicas=1时,一旦消息被写入leader端log即被认为是“已提交”,而延迟一轮FETCH RPC更新HW值的设计使得follower HW值是异步延迟更新的,假若在这个过程当中leader发生变动,那么成为新leader的follower的HW值就有多是过时的,使得clients端认为是成功提交的消息被删除。
2、leader/follower数据离散
除了可能形成的数据丢失之外,这种设计还有一个潜在的问题,即形成leader端log和follower端log的数据不一致。好比leader端保存的记录序列是r1,r2,r3,r4,r5,....;而follower端保存的序列多是r1,r3,r4,r5,r6...。这也是非法的场景,由于顾名思义,follower必须追随leader,完整地备份leader端的数据。
咱们依然使用一张图来讲明这种场景是如何发生的:
这种状况的初始状态与状况1有一些不一样的:A依然是leader,A的log写入了2条消息,但B的log只写入了1条消息。分区HW更新到2,但B的HW仍是1,同时producer端的min.insync.replicas = 1。
此次咱们让A和B所在机器同时挂掉,而后假设B先重启回来,所以成为leader,分区HW = 1。假设此时producer发送了第3条消息(绿色框表示)给B,因而B的log中offset = 1的消息变成了绿色框表示的消息,同时分区HW更新到2(A尚未回来,就B一个副本,故能够直接更新HW而不用理会A)以后A重启回来,须要执行日志截断,但发现此时分区HW=2而A以前的HW值也是2,故不作任何调整。此后A和B将以这种状态继续正常工做。
显然,这种场景下,A和B底层log中保存在offset = 1的消息是不一样的记录,从而引起不一致的情形出现。
Kafka 0.11.0.0.版本解决方案
形成上述两个问题的根本缘由在于HW值被用于衡量副本备份的成功与否以及在出现failture时做为日志截断的依据,但HW值的更新是异步延迟的,特别是须要额外的FETCH请求处理流程才能更新,故这中间发生的任何崩溃均可能致使HW值的过时。鉴于这些缘由,Kafka 0.11引入了leader epoch来取代HW值。Leader端多开辟一段内存区域专门保存leader的epoch信息,这样即便出现上面的两个场景也能很好地规避这些问题。
所谓leader epoch其实是一对值:(epoch,offset)。epoch表示leader的版本号,从0开始,当leader变动过1次时epoch就会+1,而offset则对应于该epoch版本的leader写入第一条消息的位移。所以假设有两对值:
(0, 0)
(1, 120)
则表示第一个leader从位移0开始写入消息;共写了120条[0, 119];而第二个leader版本号是1,从位移120处开始写入消息。
leader broker中会保存这样的一个缓存,并按期地写入到一个checkpoint文件中。
当leader写底层log时它会尝试更新整个缓存——若是这个leader首次写消息,则会在缓存中增长一个条目;不然就不作更新。而每次副本从新成为leader时会查询这部分缓存,获取出对应leader版本的位移,这就不会发生数据不一致和丢失的状况。
下面咱们依然使用图的方式来讲明下利用leader epoch如何规避上述两种状况
1、规避数据丢失
上图左半边已经给出了简要的流程描述,这里不详细展开具体的leader epoch实现细节(好比OffsetsForLeaderEpochRequest的实现),咱们只须要知道每一个副本都引入了新的状态来保存本身当leader时开始写入的第一条消息的offset以及leader版本。这样在恢复的时候彻底使用这些信息而非水位来判断是否须要截断日志。
2、规避数据不一致
一样的道理,依靠leader epoch的信息能够有效地规避数据不一致的问题。
总结
0.11.0.0版本的Kafka经过引入leader epoch解决了原先依赖水位表示副本进度可能形成的数据丢失/数据不一致问题。有兴趣的读者能够阅读源代码进一步地了解其中的工做原理。
源代码位置:kafka.server.epoch.LeaderEpochCache.scala (leader epoch数据结构)、kafka.server.checkpoints.LeaderEpochCheckpointFile(checkpoint检查点文件操做类)还有分布在Log中的CRUD操做。