本篇文章讨论的kafka版本是目前最新版 0.10.1.0。java
全部broker会经过ZooKeeper选举出一个做为KafkaController,来负责:apache
每一个分区能够有多个副本,分散在不一样的broker上。缓存
leader副本:被KafkaController选举出来的,做为该分区的leader服务器
其余follower副本:其余副本都做为follower副本微信
isr列表:简单描述就是,"跟得上"leader的副本列表(包含leader),最开始是全部副本。这里的跟得上是指多线程
每个producer发送消息给某个分区的leader副本,其余follower副本又会复制该消息。producer端有一个acks参数能够设置:异步
同时对于isr列表的数量要求也有一个配置fetch
咱们本篇文章就重点经过kafka的原理来揭示在acks=-1的状况下,哪些状况下会丢失数据,或许能够提一些改进措施来作到不丢失数据。线程
下面会先介绍下leader和follower副本复制的原理scala
leader副本的属性
leader副本拥有其余副本的记录,保存着他们的以下属性:
follower副本的属性
其中follower会不断的向leader发送fetch请求,若是没有数据fetch则被leader阻塞一段时间,等待新数据的来临,一旦来临则解除阻塞,复制数据给follower。
咱们来看下当producer的acks=-1时,一次消息写入的整个过程,上述是属性是怎么变化的
1.3.1 消息准备写入leader副本,leader副本首先判断当前isr列表是否小于min.insync.replicas,不小于才容许写入。
若是不小于,leader写入到本身的log中,获得该消息的offset,而后对其余follower的fetch请求解除阻塞,复制必定量的消息给follower
同时leader将本身最新的highWatermarkMetadata传给follower
同时会判断此次复制是否复制到leader副本的末尾了,即logEndOffsetMetadata位置,若是是的话,则更新上述的lastCaughtUpTimeMs
1.3.2 follower会将fetch来的数据写入到本身的log中,本身的logEndOffsetMetadata获得了更新,同时更新本身的highWatermarkMetadata,就是取leader传来的highWatermarkMetadata和本身的logEndOffsetMetadata中的最小值
而后follower再一次向leader发送fetch请求,fetch的初始offset就是本身的logEndOffsetMetadata+1。
1.3.3 leader副本收到该fetch后,会更新leader副本中该follower的logEndOffsetMetadata为上述fetch的offset,同时会对全部的isr列表的logEndOffsetMetadata排序获得最小的logEndOffsetMetadata做为最新的highWatermarkMetadata
若是highWatermarkMetadata已经大于了leader写入该消息的offset了,说明该消息已经被isr列表都复制过了,则leader开始回应producer
判断当前isr列表的size是否小于min.insync.replicas,若是小于返回NotEnoughReplicasAfterAppendException异常,不小于则表明正常写入了。
1.3.4 follower在下一次的fetch请求的响应中就会获得leader最新的highWatermarkMetadata,更新本身的highWatermarkMetadata
若是某个broker挂了,leader副本在该broker上的分区就要从新进行leader选举。来简要描述下leader选举的过程
1.4.1 KafkaController会监听ZooKeeper的/brokers/ids节点路径,一旦发现有broker挂了,执行下面的逻辑。这里暂时先不考虑KafkaController所在broker挂了的状况,KafkaController挂了,各个broker会从新leader选举出新的KafkaController
1.4.2 leader副本在该broker上的分区就要从新进行leader选举,目前的选举策略是
1.4.2.1 优先从isr列表中选出第一个做为leader副本
1.4.2.2 若是isr列表为空,则查看该topic的unclean.leader.election.enable配置。
unclean.leader.election.enable:为true则表明容许选用非isr列表的副本做为leader,那么此时就意味着数据可能丢失,为false的话,则表示不容许,直接抛出NoReplicaOnlineException异常,形成leader副本选举失败。
1.4.2.3 若是上述配置为true,则从其余副本中选出一个做为leader副本,而且isr列表只包含该leader副本。
一旦选举成功,则将选举后的leader和isr和其余副本信息写入到该分区的对应的zk路径上。
1.4.3 KafkaController向上述相关的broker上发送LeaderAndIsr请求,将新分配的leader、isr、所有副本等信息传给他们。同时将向全部的broker发送UpdateMetadata请求,更新每一个broker的缓存的metadata数据。
1.4.4 若是是leader副本,更新该分区的leader、isr、全部副本等信息。若是本身以前就是leader,则如今什么操做都不用作。若是以前不是leader,则需将本身保存的全部follower副本的logEndOffsetMetadata设置为UnknownOffsetMetadata,以后等待follower的fetch,就会进行更新
1.4.5 若是是follower副本,更新该分区的leader、isr、全部副本等信息
而后将日志截断到本身保存的highWatermarkMetadata位置,即日志的logEndOffsetMetadata等于了highWatermarkMetadata
最后建立新的fetch请求线程,向新leader不断发送fetch请求,初次fetch的offset是logEndOffsetMetadata。
上述重点就是leader副本的日志不作处理,而follower的日志则须要截断到highWatermarkMetadata位置。
至此,算是简单描述了分区的基本状况,下面就针对上述过程来讨论下kafka分区的高可用和一致性问题。
哪些场景下会丢失消息?
acks= 0、1,很明显都存在消息丢失的可能。
即便设置acks=-1,当isr列表为空,若是unclean.leader.election.enable为true,则会选择其余存活的副本做为新的leader,也会存在消息丢失的问题。
即便设置acks=-1,当isr列表为空,若是unclean.leader.election.enable为false,则不会选择其余存活的副本做为新的leader,即牺牲了可用性来防止上述消息丢失问题。
即便设置acks=-1,而且选出isr中的副本做为leader的时候,仍然是会存在丢数据的状况的:
s1 s2 s3是isr列表,还有其余副本为非isr列表,s1是leader,一旦某个日志写入到s1 s2 s3,则s1将highWatermarkMetadata提升,并回复了客户端ok,可是s2 s3的highWatermarkMetadata可能还没被更新,此时s1挂了,s2当选leader了,s2的日志不变,可是s3就要截断日志了,这时已经回复客户端的日志是没有丢的,由于s2已经复制了。
可是若是此时s2一旦挂了,s3当选,则s3上就不存在上述日志了(前面s2当选leader的时候s3已经将日志截断了),这时候就形成日志丢失了。
其实咱们是但愿上述最后一个场景可以作到不丢消息的,可是目前的作法仍是可能会丢消息的。
丢消息最主要的缘由是:
因为follower的highWatermarkMetadata相对于leader的highWatermarkMetadata是延迟更新的,当leader选举完成后,全部follower副本的截断到本身的highWatermarkMetadata位置,则可能截断了已被老leader提交了的日志,这样的话,这部分日志仅仅存在新的leader副本中,在其余副本中消失了,一旦leader副本挂了,这部分日志就完全丢失了
这个截断到highWatermarkMetadata的操做的确太狠了,可是它的用途有一个就是:避免了日志的不一致的问题。经过每次leader选举以后的日志截断,来达到和leader之间日志的一致性,避免出现日志错乱的状况。
ZooKeeper和Raft的实现也有相似的日志复制的问题,那ZooKeeper和Raft的实现有没有这种问题呢?他们是如何解决的呢?
Raft并不进行日志的截断操做,而是会经过每第二天志复制时的一致性检查来进行日志的纠正,达到和leader来保持一致的目的。不截断日志,那么对于已经提交的日志,则必然存在过半的机器上从而可以保证日志基本是不会丢失的。
ZooKeeper只有当某个follower的记录超出leader的部分才会截断,其余的不会截断的。选举出来的leader是通过过半pk的,必然是包含所有已经被提交的日志的,即便该leader挂了,再次从新选举,因为不进行日志截断,仍然是能够选出其余包含所有已提交的日志的(有过半的机器都包含所有已提交的日志)。ZooKeeper对于日志的纠正则是在leader选举完成后专门开启一个纠正过程。
kafka的截断到highWatermarkMetadata的确有点太粗暴了,若是不截断日志,则须要解决日志错乱的问题,即便不可以像ZooKeeper那样花大代价专门开启一个纠正过程,能够像Raft那样每次在fetch的时候能够进行不断的纠正。这一块还有待继续关注。
kafka目前是只能保证一个分区内的数据是有序的。
可是你可能常常据说,一旦某个broker挂了,就可能产生乱序问题(也没人指出乱序的缘由),是否正确呢?
首先来看看如何能保证单个分区内消息的有序性,有以下几个过程:
3.1 producer按照消息的顺序进行发送
不少时候为了发送效率,采用的办法是多线程、异步、批量发送。
若是为了保证顺序,则不能使用多线程来执行发送任务。
异步:通常是把消息先发到一个队列中,由后台线程不断的执行发送任务。这种方式对消息的顺序也是有影响的:
如先发送消息1,后发送消息2,此时服务器端在处理消息1的时候返回了异常,可能在处理消息2的时候成功了,此时若再重试消息1就会形成消息乱序的问题。因此producer端须要先确认消息1发送成功了才能执行消息2的发送。
对于kafka来讲,目前是异步、批量发送。解决异步的上述问题就是配置以下属性:
max.in.flight.requests.per.connection=1
即producer发现一旦还有未确认发送成功的消息,则后面的消息不容许发送。
3.2 相同key的消息可以hash到相同的分区
正常状况下是没问题的,可是一旦某个分区挂了,如本来总共4个分区,此时只有3个分区存活,在此分区恢复的这段时间内,是否会存在hash错乱到别的分区?
那就要看producer端获取的metadata信息是否会立马更新成3个分区。目前看来应该是不会的
producer见到的metadata数据是各个broker上的缓存数据,这些缓存数据是由KafkaController来统一进行更新的。一旦leader副本挂了,KafkaController并不会去立马更新成3个分区,而是去执行leader选举,选举完成后才会去更新metadata数据,此时选举完成后仍然是保证4个分区的,也就是说producer是不可能获取到只有3个分区的metadata数据的,因此producer端仍是能正常hash的,不会错乱分区的。
在整个leader选举恢复过程,producer最可能是没法写入数据(后期能够重试)。
3.3 系统对顺序消息的支持
leader副本按照消息到来的前后顺序写入本地日志的,一旦写入日志后,该顺序就肯定了,follower副本也是按照该顺序进行复制的。对于消息的提交也是按照消息的offset从低到高来确认提交的,因此说kafka对于消息的处理是顺序的。
3.4 consumer可以按照消息的顺序进行消费
为了接收的效率,可能会使用多线程进行消费。这里为了保证顺序就只能使用单线程来进行消费了。
目前kafka的Consumer有scala版本的和java版本的(这一块以后再详细探讨),最新的java版本,对用户提供一个poll方法,用户本身去决定是使用多线程仍是单线程。
如何看待kafka的isr列表设计?和过半怎么对比呢?
对于相同数量的2n个follower加一个leader,过半呢则容许n个follower挂掉,而isr呢则容许2n个follower挂掉(可是会存在丢失消息的问题),因此过半更多会牺牲可用性(挂掉一半以上就不可用了)来加强数据的一致性,而isr会牺牲一致性来加强可用性(挂掉一半以上扔可以使用,可是存在丢数据的问题)
可是在确认效率上:过半仅仅须要最快的n+1的写入成功便可断定为成功,而isr则须要2n+1的写入成功才算成功。同时isr是动态变化的过程,一旦跟不上或者跟上了都会离开或者加入isr列表。isr列表越小写入速度就会加快。
有哪些环节会形成消息的重复消费?若是避免不了,如何去减小重复?
producer端重复发送
producer端因发送超时等等缘由作重试操做,目前broker端作重复请求的判断仍是很难的,目前kafka也没有去作,而是存储完消息以后,若是开启了Log compaction,它会经过kafka消息中的key来断定是不是重复消息,是的话则会删除。
consumer消费后,未及时提交消费的offset便挂了,下次恢复后就会重复消费
这个目前来讲并无通用的解决办法,先消费后提交offset可能会重复,先提交offset后消费可能形成消息丢失,因此通常仍是优先保证消息不丢,在业务上去作去重判断。
本文章涉及到的话题不少,不免有疏漏的地方,还请批评指正。
欢迎继续来讨论,越辩越清晰。
欢迎关注微信公众号:乒乓狂魔