这两个是broker里面的两个概念名词。缓存
所谓_副本_,就是kafka为了提升可用性,将同一个parition数据分散到不一样broker的数据备份。同时kafka会选出一个leader副本用于对外读,follower则须要主动向leader副本请求同步kafka日志,以保证主从副本数据保持一致。服务器
所谓_ISR_,就是有资格能被评选为leader副本的follower副本集合。只有leader和ISR中全部副本都同步状态了,才会被kafka认为该消息已经提交。fetch
起始位移( base offset ):该副本第一条消息的位移spa
高水印值( high watermark, HW ):该副本最新一条_已经提交_的消息位移。_若是某个消息的offset小于该值,则全部的副本都已经同步这条消息了;若是某个消息的offset大于该值,则肯说明些副本还没同步到这条消息_。换一种说法,全部offset小于该值的消息对consumer是可见的;而大于该值的是不可见的。该值很是重要,他的值影响到副本主从同步的位置,影响到consumer消费数据的位置。应该注意的是,HW不止在leader里存在,在follower里也存在,其缘由就是为了防止leader崩溃,follower也能当即顶替leader进行正常工做(最终一致性)。日志
日志末端位移 (Clog end offset, LEO ):该副本最后一条信息的位移code
①broker1上的 leader 副本接收到消息,把本身的 LEO 值更新为 1 。kafka
②broker2 和 broker3 上的 follower 副本各自发送请求给 broker 1。(通常状况下,是follower定时给leader发送fetch请求的,就好像heartbeat心跳)同步
③broker1收到fetch请求后,主动分别把该消息推送给 follower 副本 。it
④follower 副本接收到消息后各自更新本身的 LEO 为 1,并返回response。io
⑤leader 副本接收到其余 follower 副本的数据请求响应( response )以后,更新 HW 值为 1 。 此时位移为 0 的这条消息能够被 consumer 消费。
_原则:HW是当前已知的全部LEO的最小值_。为何呢?正常状况下,各个broker的partition数据都是顺序写入的,最小的LEO意味着全部的副本都同步到了这个LEO之前的全部数据,就知足了“HW以前的消息都已经同步完成”的要求。
为了便于描述,咱们假设有两个副本在同步数据,一个leader一个follower
在某一时刻,leader收到了一条信息,写入了底层数据,接下来就是数据同步的过程了。
一、leader的LEO +1,好理解,有了一条信息,尾数须要加一。
二、leader尝试更新HW,取全部副本LEO最小值,本例是0。那么,哪里获取各个副本最小值呢?leader副本本地有个地方专门负责缓存这个数据,其余follower经过fetch请求告知leader
这个时候,follower发送了fetch(fetch请求里会带着本身如今的LEO,如今是0),leader收到了fetch
三、leader收到了fetch,尝试更新HW。全部副本LEO最小值。本身的是1,fetch里是0,那么是0。
四、获取offset >follower LEO的数据放到response里,同时将本身的HW(注意!!此时是全局LEO最小值)放到response里,本例里是0
五、follower接受到了response,将数据写入,同时更新LEO,本例里+1
六、follower尝试更新HW,是全局LEO最小值,比较response里的HW和本身的LEO取最小值便可(上面红字的特性,这里就用到了)。本例里更新后是0
此时第一轮fetch结束,应该注意到,第一轮fetch完成后,数据虽然同步过去了,可是还不可见,由于leader此时还不知道follower是否是同步成功了
一、follower发送了fetch请求,携带本身的LEO=1
二、leader尝试更新HW,全局LEO的最小值,所以是1
三、获取offset >follower LEO的数据放到response里,此次没有数据,同时将本身的HW=1放到response里
四、follower收到信息,没数据写入,而后尝试更新本身的HW,全局最小值,本例HW=1
此时第二轮fetch结束,此时此刻,数据同步才真正结束,这条新数据对外可见了
综合上面的论述,通过两轮fetch过程后才会对外可见。这个时间差就容易致使数据丢失或者不一致的问题
场景一:两轮fetch中间发生follwer和leader前后崩溃,前提:leader写入完成即认为已提交
如图,假如第二轮fetch发生,A已经更新了HW,可是尚未包装response返回给B,此时B发生了崩溃。重启后的B会将LEO调整成崩溃前的HW值,那么后面的数据就被删除了(看,此时出现了一次数据不一致)。这个时候B想要向A发起fetch,若是这个时候刚好A挂掉了,B被选为leader,A重启回来后就会fetch取leader的HW和本身的LEO比较取最小值,最后获得HW=1,这样原来HW=2的数据就永久丢失了。
场景二:两轮fetch中,follower和leader同时崩溃
仍是上面那种状况,第二轮fetch发生,A已经更新了HW,可是尚未包装response返回给B。这个时候leader和follower同时崩溃,而后B先重启成为leader了。这个时候,producer发送了一个消息记录到B,此时由于没有follower,所以直接更新HW到2。然后A回来成为follower,这时,A发现本身的HW和B的HW相等,所以不作变动。可是A的HW指向的消息和B的HW指向的消息并非一回事,这显然就不是咱们想要的了。
kafka 0.11版本以后引入了leader epoch(我理解其实就是带有版本信息的HW)来取代HW,同时重启后的follower增长了一种请求,解决了这个问题。(这也是为啥商业使用基本上都用0.11以后的版本)。epoch其实是一对值(epoch,offset)。 epoch 表示 leader 的版本号,offset表示本次写入的位置。好比(0,0)就表明这是第一次写入,写入位置是0 。(1,120)就表明,这是第二次写入,写入位置是120(前面已经写了119个数据)
场景一的解决:
场景二的解决:
**附录:leader 中HW的更新条件** 1. 副本成为leader副本的时候 2. 集群里有broker崩溃,退出ISR的时候 3. producer向leader推送数据的时候