Kafka 消息以 Partition 做为存储单元,那么在 Partition 内消息是以什么样的格式存储的呢,如何处理 Partition 中的消息,又有哪些安全策略来保证消息不会丢失呢,这一篇咱们一块儿看看这些问题。node
每一个 Topic 的消息被一个或者多个 Partition 进行管理,Partition 是一个有序的,不变的消息队列,消息老是被追加到尾部。一个 Partition 不能被切分红多个散落在多个 broker 上或者多个磁盘上。算法
它做为消息管理名义上最大的管家内里实际上是由不少的 Segment 文件组成。若是一个 Partition 是一个单个很是长的文件的话,那么这个查找操做会很是慢而且容易出错。为解决这个问题,Partition 又被划分红多个 Segment 来组织数据。Segment 并非终极存储,在它的下面还有两个组成部分:缓存
Segment 文件的命名规则是: 某个 Partition 全局的第一个 Segment 从 0 开始,后续每一个 Segment 文件名以当前 Partition 的最大 offset(消息偏移量)为基准,文件名长度为 64 位 long 类型,19 位数字字符长度,不足部分用 0 填充。安全
每一条消息的组成内容有以下字段:异步
offset: 4964(逻辑偏移量) position: 75088(物理偏移量) CreateTime: 1545203239308(建立时间) isvalid: true(是否有效) keysize: -1(键大小) valuesize: 9(值大小) magic: 2 compresscodec: NONE(压缩编码) producerId: -1 producerEpoch: -1(epoch号) sequence: -1(序号) isTransactional: false(是否事务) headerKeys: [] payload: message_0(消息的具体内容)
为何要设计 Partition 和 Segment 的存储机制性能
Partition 是对外名义上的数据存储,用户理解数据都是顺序存储到 Partition 中。那么实际在 Partition 内又多了一套不对用户可见的 Segment 机制是为何呢?缘由有两个:fetch
提起高可用咱们大概猜到要作副本机制,多弄几个备份确定好。Kafka 也不例外提供了副本的概念(Replica),经过副本机制来实现冗余备份。每一个 Partition 能够设置多个副本,在副本集合中会存在一个 leader 的概念,全部的读写请求都是由 leader 来进行处理。剩余副本都作为 follower,follower 会从 leader 同步消息日志 。ui
经常使用的节点选举算法有 Raft 、Paxos、 Bully 等,根据业务的特色 Kafka 并无彻底套用这些算法,首先有以下概念:编码
每一个 Partition 都有惟一一个预写日志(write-ahead log),Producer 写入的消息会先存入这里。每一条消息都有惟一一个偏移量 offset,若是这条消息带有 key, 就会根据 key hash 值进行路由到对应的 Partition,若是没有指定 key 则根据随机算法路由到一个 Partition。设计
一个 Topic 的某个 Partition 若是有多副本机制存在,正常状况下只能有一个 副本是对外提供读写服务的,其他副本从它这里同步数据。那么这个对外提供服务的 leader 是如何选举出来的呢?这个问题要分为两种状况,一种是 Kafka 首次启动的选举,另外一种是启动后遇到故障或者增删副本以后的选举。
当 broker 启动后全部的 broker 都会去 zk 注册,这时候第一个在 zk 注册成功的 broker 会成为 leader,其他的都是 follower,这个 broker leader 后续去执行 Partition leader 的选举。
首先会从 zk 中读取 Topic 每一个分区的 ISR;
而后调用配置的分区选择算法来选择分区 leader,这些算法有不一样的使用场景,broker 启动,znode 发生变化,新产生节点,发生 rebalance 的时候等等。经过算法选定一个分区做为 leader就肯定了首次启动选举。
好比分区发生重分配的时候也会执行 leader 的选举操做。这种状况会从重分配的 AR 列表中找到第一个存活的副本,且这个副本在目前的 ISR 列表中。
若是某个节点被优雅地关闭(也就是执行 ControlledShutdown )时,位于这个节点上的 leader 副本都会下线,因此与此对应的分区须要执行 leader 的选举。这里的具体操做为:从 AR 列表中找到第一个存活的副本,且这个副本在目前的 ISR 列表中,与此同时还要确保这个副本不处于正在被关闭的节点上。
一旦 Partition 的 leader 肯定后续的写消息都会向这个副本请求操做,其他副本都会同步它的数据。上面咱们提到过几个概念:AR 、ISR、 OSR,在副本同步的过程当中会应用到这几个队列。
首先 ISR 队列确定包含当前的 leader 副本,也可能只有 leader 副本。什么状况下其他副本可以进入到 ISR 队列呢?
Kafka 提供了一个参数设置:rerplica.lag.time.max.ms=10000,这个参数表示 leader 副本可以落后 flower 副本的最长时间间隔,当前默认值是 10 秒。就是说若是 leader 发现 flower 超过 10 秒没有向它发起 fetch 请求,那么 leader 就认为这个 flower 出了问题。若是 fetch 正常 leader 就认为该 Follower 副本与 Leader 是同步的,即便此时 Follower 副本中保存的消息明显少于 Leader 副本中的消息。
例如上图中的两个 follower 明显慢于 leader,可是若是落后的时间在10 秒内,那么这三个副本都会在 ISR 中存在,不然,落后的副本会被剔除并加入到 OSR。
固然若是后面 follower 逐渐追上了 leader 的进度,那么该 follower 仍是会被加入到 ISR,因此 ISR 并非一个固定不变的集合,它是会动态调整的。
leader 和 follower 之间的数据同步过程大概以下:
初始状态下 leader 和 follower 的 HW 和 LEO 都是 0,follower 会不断地向 leader 发送请求 fetch 数据。可是由于没有数据,这个请求会被 leader 强制拖住,直到到达咱们配置的 replica.fetch.wait.max.ms 时间以后才会被释放。同时若是在这段时间内有数据产生则直接返回数据。
Producer 向某个 Topic 推过来一条消息,当前 Topic 的 leader Partition 进行相应,那么若是其他 follower 没有同步成功消息会怎么样呢?这个问题 Kafka 交给用户来决定。
producer 提供了以下配置:
request.required.asks=0
能够看到以上确认机制配置逐级严格,生产环境综合考虑通常选择配置 = 1,若是你的业务对数据完整性要求比较高且能够接收数据处理速度稍慢那么选择 = 2。
某个消费组消费 partition 须要保存 offset 记录当前消费位置,0.10 以前的版本是把 offset 保存到 zk 中,可是 zk 的写性能不是很好,Kafka 采用的方案是 consumer 每分钟上报一次,这样就形成了重复消费的可能。
0.10 版本以后 Kafka 就 offset 的保存从 zk 剥离,保存到一个名为 consumer_offsets 的 Topic 中。消息的 key 由 [groupid、topic、partition] 组成,value 是偏移量 offset。Topic 配置的清理策略是compact。老是保留最新的 key,其他删掉。通常状况下,每一个 key 的 offset 都是缓存在内存中,查询的时候不用遍历 Partition,若是没有缓存第一次就会遍历 Partition 创建缓存而后查询返回。