kafka是一个分布式消息队列。具备高性能、持久化、多副本备份、横向扩展能力。生产者往队列里写消息,消费者从队列里取消息进行业务逻辑。通常在架构设计中起到解耦、削峰、异步处理的做用。正则表达式
kafka对外使用topic的概念,生产者往topic里写消息,消费者从读消息。为了作到水平扩展,一个topic实际是由多个partition组成的,遇到瓶颈时,能够经过增长partition的数量来进行横向扩容。单个parition内是保证消息有序。算法
每新写一条消息,kafka就是在对应的文件append写,因此性能很是高。apache
kafka的整体数据流是这样的:api
大概用法就是,Producers往Brokers里面的指定Topic中写消息,Consumers从Brokers里面拉去指定Topic的消息,而后进行业务处理。
图中有两个topic,topic 0有两个partition,topic 1有一个partition,三副本备份。能够看到consumer gourp 1中的consumer 2没有分到partition处理,这是有可能出现的,下面会讲到。缓存
关于broker、topics、partitions的一些元信息用zk来存,监控和路由啥的也都会用到zk。网络
基本流程是这样的:架构
建立一条记录,记录中一个要指定对应的topic和value,key和partition可选。 先序列化,而后按照topic和partition,放进对应的发送队列中。kafka produce都是批量请求,会积攒一批,而后一块儿发送,不是调send()就进行马上进行网络发包。
若是partition没填,那么状况会是这样的:app
这些要发往同一个partition的请求按照配置,攒一波,而后由一个单独的线程一次性发过去。异步
有high level api,替咱们把不少事情都干了,offset,路由啥都替咱们干了,用以来很简单。
还有simple api,offset啥的都是要咱们本身记录。分布式
当存在多副本的状况下,会尽可能把多个副本,分配到不一样的broker上。kafka会为partition选出一个leader,以后全部该partition的请求,实际操做的都是leader,而后再同步到其余的follower。当一个broker歇菜后,全部leader在该broker上的partition都会从新选举,选出一个leader。(这里不像分布式文件存储系统那样会自动进行复制保持副本数)
而后这里就涉及两个细节:怎么分配partition,怎么选leader。
关于partition的分配,还有leader的选举,总得有个执行者。在kafka中,这个执行者就叫controller。kafka使用zk在broker中选出一个controller,用于partition分配和leader选举。
controller会在Zookeeper的/brokers/ids节点上注册Watch,一旦有broker宕机,它就能知道。当broker宕机后,controller就会给受到影响的partition选出新leader。controller从zk的/brokers/topics/[topic]/partitions/[partition]/state中,读取对应partition的ISR(in-sync replica已同步的副本)列表,选一个出来作leader。
选出leader后,更新zk,而后发送LeaderAndISRRequest给受影响的broker,让它们改变知道这事。为何这里不是使用zk通知,而是直接给broker发送rpc请求,个人理解多是这样作zk有性能问题吧。
若是ISR列表是空,那么会根据配置,随便选一个replica作leader,或者干脆这个partition就是歇菜。若是ISR列表的有机器,可是也歇菜了,那么还能够等ISR的机器活过来。
这里的策略,服务端这边的处理是follower从leader批量拉取数据来同步。可是具体的可靠性,是由生产者来决定的。
生产者生产消息的时候,经过request.required.acks参数来设置数据的可靠性。
acks | what happen |
---|---|
0 | which means that the producer never waits for an acknowledgement from the broker.发过去就完事了,不关心broker是否处理成功,可能丢数据。 |
1 | which means that the producer gets an acknowledgement after the leader replica has received the data. 当写Leader成功后就返回,其余的replica都是经过fetcher去同步的,因此kafka是异步写,主备切换可能丢数据。 |
-1 | which means that the producer gets an acknowledgement after all in-sync replicas have received the data. 要等到isr里全部机器同步成功,才能返回成功,延时取决于最慢的机器。强一致,不会丢数据。 |
在acks=-1的时候,若是ISR少于min.insync.replicas指定的数目,那么就会返回不可用。
这里ISR列表中的机器是会变化的,根据配置replica.lag.time.max.ms,多久没同步,就会从ISR列表中剔除。之前还有根据落后多少条消息就踢出ISR,在1.0版本后就去掉了,由于这个值很难取,在高峰的时候很容易出现节点不断的进出ISR列表。
从ISA中选出leader后,follower会从把本身日志中上一个高水位后面的记录去掉,而后去和leader拿新的数据。由于新的leader选出来后,follower上面的数据,可能比新leader多,因此要截取。这里高水位的意思,对于partition和leader,就是全部ISR中都有的最新一条记录。消费者最多只能读到高水位;
从leader的角度来讲高水位的更新会延迟一轮,例如写入了一条新消息,ISR中的broker都fetch到了,可是ISR中的broker只有在下一轮的fetch中才能告诉leader。
也正是因为这个高水位延迟一轮,在一些状况下,kafka会出现丢数据和主备数据不一致的状况,0.11开始,使用leader epoch来代替高水位。(https://cwiki.apache.org/confluence/display/KAFKA/KIP-101+-+Alter+Replication+Protocol+to+use+Leader+Epoch+rather+than+High+Watermark+for+Truncation#KIP-101-AlterReplicationProtocoltouseLeaderEpochratherthanHighWatermarkforTruncation-Scenario1:HighWatermarkTruncationfollowedbyImmediateLeaderElection)
思考:
当acks=-1时
订阅topic是以一个消费组来订阅的,一个消费组里面能够有多个消费者。同一个消费组中的两个消费者,不会同时消费一个partition。换句话来讲,就是一个partition,只能被消费组里的一个消费者消费,可是能够同时被多个消费组消费。所以,若是消费组内的消费者若是比partition多的话,那么就会有个别消费者一直空闲。
订阅topic时,能够用正则表达式,若是有新topic匹配上,那能自动订阅上。
一个消费组消费partition,须要保存offset记录消费到哪,之前保存在zk中,因为zk的写性能很差,之前的解决方法都是consumer每隔一分钟上报一次。这里zk的性能严重影响了消费的速度,并且很容易出现重复消费。
在0.10版本后,kafka把这个offset的保存,从zk总剥离,保存在一个名叫__consumeroffsets topic的topic中。写进消息的key由groupid、topic、partition组成,value是偏移量offset。topic配置的清理策略是compact。老是保留最新的key,其他删掉。通常状况下,每一个key的offset都是缓存在内存中,查询的时候不用遍历partition,若是没有缓存,第一次就会遍历partition创建缓存,而后查询返回。
肯定consumer group位移信息写入__consumers_offsets的哪一个partition,具体计算公式:
__consumers_offsets partition = Math.abs(groupId.hashCode() % groupMetadataTopicPartitionCount) //groupMetadataTopicPartitionCount由offsets.topic.num.partitions指定,默认是50个分区。
思考:
若是正在跑的服务,修改了offsets.topic.num.partitions,那么offset的保存是否是就乱套了?
生产过程当中broker要分配partition,消费过程这里,也要分配partition给消费者。相似broker中选了一个controller出来,消费也要从broker中选一个coordinator,用于分配partition。
下面从顶向下,分别阐述一下
这里咱们能够看到,consumer group的coordinator,和保存consumer group offset的partition leader是同一台机器。
把coordinator选出来以后,就是要分配了
整个流程是这样的:
五、consumer向coordinator发送SyncGroupRequest,其中leader的SyncGroupRequest会包含分配的状况。
六、coordinator回包,把分配的状况告诉consumer,包括leader。
当partition或者消费者的数量发生变化时,都得进行reblance。
列举一下会reblance的状况:
kafka支持3种消息投递语义
At most once:最多一次,消息可能会丢失,但不会重复
At least once:最少一次,消息不会丢失,可能会重复
Exactly once:只且一次,消息不丢失不重复,只且消费一次(0.11中实现,仅限于下游也是kafka)
在业务中,经常都是使用At least once的模型,若是须要可重入的话,每每是业务本身实现。
先获取数据,再进行业务处理,业务处理成功后commit offset。
一、生产者生产消息异常,消息是否成功写入不肯定,重作,可能写入重复的消息
二、消费者处理消息,业务处理成功后,更新offset失败,消费者重启的话,会重复消费
先获取数据,再commit offset,最后进行业务处理。
一、生产者生产消息异常,无论,生产下一个消息,消息就丢了
二、消费者处理消息,先更新offset,再作业务处理,作业务处理失败,消费者重启,消息就丢了
思路是这样的,首先要保证消息不丢,再去保证不重复。因此盯着At least once的缘由来搞。 首先想出来的:
因为业务接口是否幂等,不是kafka能保证的,因此kafka这里提供的exactly once是有限制的,消费者的下游也必须是kafka。因此一下讨论的,没特殊说明,消费者的下游系统都是kafka(注:使用kafka conector,它对部分系统作了适配,实现了exactly once)。
生产者幂等性好作,没啥问题。
解决重复消费有两个方法:
原本exactly once实现第1点就ok了。
可是在一些使用场景下,咱们的数据源多是多个topic,处理后输出到多个topic,这时咱们会但愿输出时要么所有成功,要么所有失败。这就须要实现事务性。既然要作事务,那么干脆把重复消费的问题从根源上解决,把commit offset和输出到其余topic绑定成一个事务。
思路是这样的,为每一个producer分配一个pid,做为该producer的惟一标识。producer会为每个<topic,partition>维护一个单调递增的seq。相似的,broker也会为每一个<pid,topic,partition>记录下最新的seq。当req_seq == broker_seq+1时,broker才会接受该消息。由于:
消息的seq不比broker的seq小,那么说明该消息已被保存。
场景是这样的:
其中第二、3点做为一个事务,要么全成功,要么全失败。这里得益与offset其实是用特殊的topic去保存,这两点都归一为写多个topic的事务性处理。
基本思路是这样的:
引入tid(transaction id),和pid不一样,这个id是应用程序提供的,用于标识事务,和producer是谁并不要紧。就是任何producer均可以使用这个tid去作事务,这样进行到一半就死掉的事务,能够由另外一个producer去恢复。
同时为了记录事务的状态,相似对offset的处理,引入transaction coordinator用于记录transaction log。在集群中会有多个transaction coordinator,每一个tid对应惟一一个transaction coordinator。
注:transaction log删除策略是compact,已完成的事务会标记成null,compact后不保留。
作事务时,先标记开启事务,写入数据,所有成功就在transaction log中记录为prepare commit状态,不然写入prepare abort的状态。以后再去给每一个相关的partition写入一条marker(commit或者abort)消息,标记这个事务的message能够被读取或已经废弃。成功后在transaction log记录下commit/abort状态,至此事务结束。
数据流:
首先使用tid请求任意一个broker(代码中写的是负载最小的broker),找到对应的transaction coordinator。
请求transaction coordinator获取到对应的pid,和pid对应的epoch,这个epoch用于防止僵死进程复活致使消息错乱,当消息的epoch比当前维护的epoch小时,拒绝掉。tid和pid有一一对应的关系,这样对于同一个tid会返回相同的pid。
这里prepare的状态主要是用于事务恢复,例如给相关的partition发送控制消息,没发完就宕机了,备机起来后,producer发送请求获取pid时,会把未完成的事务接着完成。
当partition中写入commit的marker后,相关的消息就可被读取。因此kafka事务在prepare commit到commit这个时间段内,消息是逐渐可见的,而不是同一时刻可见。
前面都是从生产的角度看待事务。还须要从消费的角度去考虑一些问题。
消费时,partition中会存在一些消息处于未commit状态,即业务方应该看不到的消息,须要过滤这些消息不让业务看到,kafka选择在消费者进程中进行过来,而不是在broker中过滤,主要考虑的仍是性能。kafka高性能的一个关键点是zero copy,若是须要在broker中过滤,那么势必须要读取消息内容到内存,就会失去zero copy的特性。
kafka的数据,其实是以文件的形式存储在文件系统的。topic下有partition,partition下有segment,segment是实际的一个个文件,topic和partition都是抽象概念。
在目录/${topicName}-{$partitionid}/下,存储着实际的log文件(即segment),还有对应的索引文件。
每一个segment文件大小相等,文件名以这个segment中最小的offset命名,文件扩展名是.log;segment对应的索引的文件名字同样,扩展名是.index。有两个index文件,一个是offset index用于按offset去查message,一个是time index用于按照时间去查,其实这里能够优化合到一块儿,下面只说offset index。整体的组织是这样的:
为了减小索引文件的大小,下降空间使用,方便直接加载进内存中,这里的索引使用稀疏矩阵,不会每个message都记录下具体位置,而是每隔必定的字节数,再创建一条索引。 索引包含两部分,分别是baseOffset,还有position。
baseOffset:意思是这条索引对应segment文件中的第几条message。这样作方便使用数值压缩算法来节省空间。例如kafka使用的是varint。
position:在segment中的绝对位置。
查找offset对应的记录时,会先用二分法,找出对应的offset在哪一个segment中,而后使用索引,在定位出offset在segment中的大概位置,再遍历查找message。
配置项 | 做用 |
---|---|
broker.id | broker的惟一标识 |
auto.create.topics.auto | 设置成true,就是遇到没有的topic自动建立topic。 |
log.dirs | log的目录数,目录里面放partition,当生成新的partition时,会挑目录里partition数最少的目录放。 |
配置项 | 做用 |
---|---|
num.partitions | 新建一个topic,会有几个partition。 |
log.retention.ms | 对应的还有minutes,hours的单位。日志保留时间,由于删除是文件维度而不是消息维度,看的是日志文件的mtime。 |
log.retention.bytes | partion最大的容量,超过就清理老的。注意这个是partion维度,就是说若是你的topic有8个partition,配置1G,那么平均分配下,topic理论最大值8G。 |
log.segment.bytes | 一个segment的大小。超过了就滚动。 |
log.segment.ms | 一个segment的打开时间,超过了就滚动。 |
message.max.bytes | message最大多大 |
关于日志清理,默认当前正在写的日志,是怎么也不会清理掉的。 还有0.10以前的版本,时间看的是日志文件的mtime,但这个指是不许确的,有可能文件被touch一下,mtime就变了。所以在0.10版本开始,改成使用该文件最新一条消息的时间来判断。 按大小清理这里也要注意,Kafka在定时任务中尝试比较当前日志量总大小是否超过阈值至少一个日志段的大小。若是超过可是没超过一个日志段,那么就不会删除。