Kafka
体系架构
- Producers
- Brokers
- Consumers
- Zookeeper Cluster
- manage kafka cluster config
- select leader
- rebalance consumer group
存储机制
Topic
Partition
- 一个 Topic 有多个 partition
- 每一个 partition 为每一个 consumer group 维护了一个 逻辑offset
- 一个 partition 在存储中对应一个目录,分红多个 segments
- 能够对 partition 作备份
- partition 内的消息是有序的
Segment
- 每一个 segments 对应一个 .log 文件和一个 .index 文件,在磁盘中顺序存储消息(比随机写内存效率要高)
- .index 和 .log 文件的命名方式是以逻辑 offset 命名的,第一个 00000000.log(20位) 00000000.index,后面的多是 00123123.log 00123123.index
- 分割是由配置决定的,分割的时间 或者 分割的大小
- .index 至关于一个索引文件,文件每行存放了一个局部 offset 和它在 .log 文件的偏移量,offset 是稀疏的,并不是连贯的,用以减小 .index 存储
- .log 文件有本身的格式,会记录一些元素,以及各个元素的偏移量,因此只要找到偏移量,就能遍历下面的条数,找到对应的数据,二分查找(这个不太肯定)
- 总结起来,根据 offset 查找对应的 .index,而后对 .index 进行二分查找,肯定偏移量,而后从 .log 文件中根据偏移量查找每行数据,直到找到 offset 对应的那一条数据
高可靠 Tips:这种顺序存储机制保证了快速读写(顺序存储,索引),负载均衡(partition),快速过时删除(segment),以及容灾备份(replica)算法
Kafka 没有一个缓存机制,每次都要访问文件吗?
缓存
复制和同步
- HW HighWatermark
- 每一个 partition(包括 partition 副本) 都会有一个 HW
- 这个 HW 决定了能读取的最大偏移量
- LEO LogEndOffset
- 每一个 partition 也都会有一个 LEO
- 这个是真正的消息记录的截止位置
- ISR In-Sync-Replicas
- 每一个 partition leader 维护了一个 ISR 列表,即副本同步队列,保存了 partition follower
- 若是 follower 过慢,则可能会被从列表删除
- replica.lag.time.max.ms
- replica.lag.max.messages
- OSR Out-Sync-Replicas
- AR Assigned-Replicas
kafka 的复制机制不是彻底同步的,也不是单纯的异步复制安全
- 同步复制下降了吞吐率
- 异步复制可能会丢失数据
- ISR 能够很好的均衡上述两点
ISR 的信息都会反馈到 zookeeper 上,有两个地方会维护这个信息网络
- broker controller
- 负责管理 partition 和 replica 状态
- 从新分配 partition
- LeaderSelector 选举新的 partition leader,ISR,leader_epoch,controller_epoch
- 把相关消息推送给全部 replica
- partition leader
ISR 包括了 partition leader 自身架构
replication=3 表示 算主有3个负载均衡
数据可靠性和持久性
ack
- 1 leader 确认则可直接发送下一条数据
- 0 不用等 leader 确认
- -1 ISR 全部 follower 确认
- 当配置了 min.insync.replicas 这个参数,会发挥其功效,就是 至少这个数的 ISR 中的 follower 肯定后才算提交成功,不然返回异常
担忧切换 leader 时数据丢失,由于 leader 是否会选择最新的,而不是随机选的异步
在 -1 的状况下:async
- kafka 同步,replication.factor >= 2 && min.insync.replicas >= 2,这种状况不会丢失数据
- 若是 kafka broker 宕机, ISR 中的 follower 没有所有同步,而返回了异常,这时候若是选择了已经同步的 follower,会形成数据重复
恢复后同步
- 要确保一致性
- 不会从 LEO 开始
- 会从 HW 开始
- 由于可能 LEO 可能没有同步完就 down 掉了,因此从 LEO 开始会多数据,形成不一致
leader 选举
- 不是少数服从多数,raft 这种是,zk 是,这种方式须要大量的副本
- 大量的副本会在大数据量下致使性能的急剧降低
- 不多在须要大量数据的系统中使用
- 常见的选举算法
- Zab
- Raft
- Paxos
- PacificA
- Viewstamped Replication
容错处理
若是某一个partition的全部replica都挂了,就没法保证数据不丢失了。这种状况下有两种可行的方案:性能
- 等待ISR中任意一个replica“活”过来,而且选它做为leader
- 选择第一个“活”过来的replica(并不必定是在ISR中)做为leader
默认会采用第二种测试
对于 某个 broker down 掉,可能致使服务不可用(可读不可写,ack=-1,replicas>1),这时候须要调整 min.insync.replicas = 1
Procduer 发送方式
-
producer.type=sync
-
producer.type=async
若是 producer 网络出现问题,没有收到 ack,也会重试,因此会出现 at least once;若是 consumer 设置了自动提交,那么在 producer 没出问题的前提下,是 exactly once。若是手动提交,在消费结束后提交,就是 at least once,若是在以前,就是 at most once,由于可能消费失败。因此为了保证 exactly once,须要消费后手动提交,并加入去重机制。
总结
要保证数据写入到Kafka是安全的,高可靠的,须要以下的配置:
- topic的配置:replication.factor>=3,即副本数至少是3个;2<=min.insync.replicas<=replication.factor
- broker的配置:leader的选举条件unclean.leader.election.enable=false(ISR中选取Leader)
- producer的配置:request.required.acks=-1(all),producer.type=sync
测试表现:
- 当acks=-1时,Kafka发送端的TPS受限于topic的副本数量(ISR中),副本越多TPS越低;
- acks=0时,TPS最高,其次为1,最差为-1,即TPS:acks_0 > acks_1 > acks_-1;
- min.insync.replicas参数不影响TPS;
- partition的不一样会影响TPS,随着partition的个数的增加TPS会有所增加,但并非一直成正比关系,到达必定临界值时,partition数量的增长反而会使TPS略微下降;