首先Kafka是一个分布式消息队列中间件,Apache顶级项目,https://kafka.apache.org/ 高性能、持久化、多副本备份、横向扩展。算法
生产者Producer往队列里发送消息,消费者Consumer从队列里消费消息,而后进行业务逻辑。应用场景主要有:解耦、削峰(缓冲)、异步处理、排队、分布式事务控制等等。apache
Kafka Data Flow 消息流转图缓存
上图中,消息生产者Producers往Brokers里面的指定Topic中写消息,消息消费者Consumers从Brokers里面消费指定Topic的消息,而后进行业务处理。架构
在实际的部署架构中,Broker、Topic、Partition这些元数据保存在ZooKeeper中,Kafka的监控、消息路由(分区)由ZooKeeper控制。0.8版本的OffSet也由ZooKeeper控制。异步
1、消息生产/发送过程分布式
Kafka建立Message、发送时要指定对应的Topic和Value(消息体),Key(分区键)和Partition(分区)是可选参数。 性能
调用Producer的Send()方法后,消息先进行序列化(消息序列化器可自定义实现:例如:Protobuf),而后按照Topic和Partition,临时放到内存中指定的发送队列中。达到阈值后,而后批量发送。fetch
发送时,当Partition没设置时,若是设置了Key-分区键(例如:单据类型),按照Key进行Hash取模,保证相同的Key发送到指定的分区Partition。若是未设置分区键Key,使用Round-Robin轮询随机选分区Partition。优化
2、分区Partition的高可用和选举机制ui
分区有副本的概念,保证消息不丢失。当存在多副本的状况下,会尽可能把多个副本,分配到不一样的broker上。
Kafka会为Partition选出一个Leader Broker(经过ZooKeeper),以后全部该Partition的请求,实际操做的都是Leader,而后再同步到其余的Follower。
当一个Kafka Broker宕机后,全部Leader在该Broker上的Partition都会从新选举,在剩余的Follower中选出一个Leader,继续提供服务。
正如上面所讲:Kafka使用ZooKeeper在多个Broker中选出一个Controller,用于Partition分配和Leader选举。如下是Partition的分配机制:
Controller会在ZooKeeper的/brokers/ids节点上注册Watch,一旦有broker宕机,它就能知道。
当Broker宕机后,Controller就会给受到影响的Partition选出新Leader。
Controller从ZooKeeper的/brokers/topics/[topic]/partitions/[partition]/state中,读取对应Partition的ISR(in-sync replica已同步的副本)列表,选一个出来作Leader。
选出Leader后,更新ZooKeeper的存储,而后发送LeaderAndISRRequest给受影响的Broker进行通知。
若是ISR列表是空,那么会根据配置,随便选一个replica作Leader,或者干脆这个partition就是宕机了。
若是ISR列表的有机器,可是也宕机了,那么还能够等ISR的机器活过来。
多副本同步:
服务端这边的处理是Follower从Leader批量拉取数据来同步。可是具体的可靠性,是由生产者来决定的。
生产者生产消息的时候,经过request.required.acks参数来设置数据的可靠性。
在acks=-1的时候,若是ISR少于min.insync.replicas指定的数目,那么就会返回不可用。
这里ISR列表中的机器是会变化的,根据配置replica.lag.time.max.ms,多久没同步,就会从ISR列表中剔除。之前还有根据落后多少条消息就踢出ISR,在1.0版本后就去掉了,由于这个值很难取,在高峰的时候很容易出现节点不断的进出ISR列表。
从ISA中选出leader后,follower会从把本身日志中上一个高水位后面的记录去掉,而后去和leader拿新的数据。由于新的leader选出来后,follower上面的数据,可能比新leader多,因此要截取。这里高水位的意思,对于partition和leader,就是全部ISR中都有的最新一条记录。消费者最多只能读到高水位;
从leader的角度来讲高水位的更新会延迟一轮,例如写入了一条新消息,ISR中的broker都fetch到了,可是ISR中的broker只有在下一轮的fetch中才能告诉leader。
也正是因为这个高水位延迟一轮,在一些状况下,kafka会出现丢数据和主备数据不一致的状况,0.11开始,使用leader epoch来代替高水位。
交互流程
4、消息投递语义
kafka支持3种消息投递语义,
At most once:最多一次,消息可能会丢失,但不会重复
At least once:最少一次,消息不会丢失,可能会重复
Exactly once:只且一次,消息不丢失不重复,只且消费一次(0.11中实现,仅限于下游也是kafka)
At least once:(业务中使用比较多)
先获取数据,再进行业务处理,业务处理成功后commit offset。
At most once:
先获取数据,再commit offset,最后进行业务处理。
Exactly once:
首先要保证消息不丢,再去保证不重复。因此盯着At least once的缘由来搞。
业务处理的幂等性很是重要。Kafka控制不了,须要业务来实现。好比所判断消息是否已经处理。
解决重复消费有两个方法:
生产的幂等性:
为每一个producer分配一个pid,做为该producer的惟一标识。producer会为每个<topic,partition>维护一个单调递增的seq。相似的,broker也会为每一个<pid,topic,partition>记录下最新的seq。当req_seq == broker_seq+1时,broker才会接受该消息。由于:
消息的seq不比broker的seq小,那么说明该消息已被保存。
场景是这样的:
其中第二、3点做为一个事务,要么全成功,要么全失败。这里得益与offset其实是用特殊的topic去保存,这两点都归一为写多个topic的事务性处理。
引入tid(transaction id),和pid不一样,这个id是应用程序提供的,用于标识事务,和producer是谁并不要紧。就是任何producer均可以使用这个tid去作事务,这样进行到一半就死掉的事务,能够由另外一个producer去恢复。
同时为了记录事务的状态,相似对offset的处理,引入transaction coordinator用于记录transaction log。在集群中会有多个transaction coordinator,每一个tid对应惟一一个transaction coordinator。
注:transaction log删除策略是compact,已完成的事务会标记成null,compact后不保留。
启动事务时,先标记开启事务,写入数据,所有成功就在transaction log中记录为prepare commit状态,不然写入prepare abort的状态。以后再去给每一个相关的partition写入一条marker(commit或者abort)消息,标记这个事务的message能够被读取或已经废弃。成功后 在transaction log记录下commit/abort状态,至此事务结束。
总体的数据流是这样的:
当partition中写入commit的marker后,相关的消息就可被读取。因此kafka事务在prepare commit到commit这个时间段内,消息是逐渐可见的,而不是同一时刻可见。
消息消费事务
在目录/${topicName}-{$partitionid}/下,存储着实际的log文件(即segment),还有对应的索引文件。
每一个segment文件大小相等,文件名以这个segment中最小的offset命名,文件扩展名是.log;segment对应的索引的文件名字同样,扩展名是.index。有两个index文件,一个是offset index用于按offset去查message,一个是time index用于按照时间去查,其实这里能够优化合到一块儿,下面只说offset index。整体的组织是这样的:
为了减小索引文件的大小,下降空间使用,方便直接加载进内存中,这里的索引使用稀疏矩阵,不会每个message都记录下具体位置,而是每隔必定的字节数,再创建一条索引。 索引包含两部分,分别是baseOffset,还有position。
baseOffset:意思是这条索引对应segment文件中的第几条message。这样作方便使用数值压缩算法来节省空间。例如kafka使用的是varint。
position:在segment中的绝对位置。
查找offset对应的记录时,会先用二分法,找出对应的offset在哪一个segment中,而后使用索引,在定位出offset在segment中的大概位置,再遍历查找message。