在进行详解以前,我想先声明一下,本次咱们进行讲解说明的是 Kafka 消息存储的信息文件内容,不是所谓的 Kafka 服务器运行产生的日志文件,这一点但愿你们清楚。服务器
Kafka 消息是以主题为单位进行归类,各个主题之间是彼此独立的,互不影响。每一个主题又能够分为一个或多个分区。每一个分区各自存在一个记录消息数据的日志文件。也就是该文要着重关注的内容。咱们根据以下的图进行进一步说明:负载均衡
图中,建立了一个 demo-topic 主题,其存在 7 个 Parition,对应的每一个 Parition 下存在一个 [Topic-Parition] 命名的消息日志文件。在理想状况下,数据流量分摊到各个 Parition 中,实现了负载均衡的效果。在分区日志文件中,你会发现不少类型的文件,好比:.index、.timestamp、.log、.snapshot 等,其中,文件名一致的文件集合就称为 LogSement。咱们先留有这样的一个总体的日志结构概念,接下来咱们一一的进行详细的说明其中的设计。分布式
LogSegment微服务
咱们已经知道分区日志文件中包含不少的 LogSegment ,Kafka 日志追加是顺序写入的,LogSegment 能够减少日志文件的大小,进行日志删除的时候和数据查找的时候能够快速定位。同时,ActiveLogSegment 也就是活跃的日志分段拥有文件拥有写入权限,其他的 LogSegment 只有只读的权限。源码分析
日志文件存在多种后缀文件,重点须要关注 .index、.timestamp、.log 三种类型。其余的日志类型功能做用,请查询下面图表:性能
每一个 LogSegment 都有一个基准偏移量,用来表示当前 LogSegment 中第一条消息的 offset。偏移量是一个 64 位的长整形数,固定是20位数字,长度未达到,用 0 进行填补,索引文件和日志文件都由该做为文件名命名规则(00000000000000000000.index、00000000000000000000.timestamp、00000000000000000000.log)。特别说明一下,若是日志文件名为 00000000000000000121.log ,则当前日志文件的一条数据偏移量就是 121,偏移量是从 0 开始的。学习
若是想要查看相应文件内容能够经过 kafka-run-class.sh 脚本查看 .log :线程
/data/kafka/bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log
2.0 中可使用 kafka-dump-log.sh 查 看.index 文件设计
/data/kafka/bin/kafka-dump-log.sh --files ./00000000000000000000.index
日志与索引文件3d
偏移量索引文件用于记录消息偏移量与物理地址之间的映射关系。时间戳索引文件则根据时间戳查找对应的偏移量。
Kafka 中的索引文件是以稀疏索引的方式构造消息的索引,他并不保证每个消息在索引文件中都有对应的索引项。每当写入必定量的消息时,偏移量索引文件和时间戳索引文件分别增长一个偏移量索引项和时间戳索引项,经过修改 log.index.interval.bytes 的值,改变索引项的密度。
切分文件
从上文中可知,日志文件和索引文件都会存在多个文件,组成多个 SegmentLog,那么其切分的规则是怎样的呢?
当知足以下几个条件中的其中之一,就会触发文件的切分:
为何是 Integer.MAX_VALUE ?
在偏移量索引文件中,每一个索引项共占用 8 个字节,并分为两部分。相对偏移量和物理地址。
相对偏移量:表示消息相对与基准偏移量的偏移量,占 4 个字节
物理地址:消息在日志分段文件中对应的物理位置,也占 4 个字节
4 个字节恰好对应 Integer.MAX_VALUE ,若是大于 Integer.MAX_VALUE ,则不能用 4 个字节进行表示了。
索引文件切分过程
索引文件会根据 log.index.size.max.bytes 值进行预先分配空间,即文件建立的时候就是最大值,当真正的进行索引文件切分的时候,才会将其裁剪到实际数据大小的文件。这一点是跟日志文件有所区别的地方。其意义下降了代码逻辑的复杂性。
查找消息
offset 查询
偏移量索引由相对偏移量和物理地址组成。
能够经过以下命令解析.index 文件
/data/kafka/bin/kafka-dump-log.sh --files ./00000000000000000000.index offset:0 position:0 offset:20 position:320 offset:43 position:1220
注意:offset 与 position 没有直接关系哦,因为存在数据删除和日志清理。
e.g. 如何查看 偏移量为 23 的消息?
Kafka 中存在一个 ConcurrentSkipListMap 来保存在每一个日志分段,经过跳跃表方式,定位到在 00000000000000000000.index ,经过二分法在偏移量索引文件中找到不大于 23 的最大索引项,即 offset 20 那栏,而后从日志分段文件中的物理位置为320 开始顺序查找偏移量为 23 的消息。
时间戳方式查询
在上文已经有所说起,经过时间戳方式进行查找消息,须要经过查找时间戳索引和偏移量索引两个文件。
时间戳索引索引格式
e.g. 查找时间戳为 1557554753430 开始的消息?
注意:timestamp文件中的 offset 与 index 文件中的 relativeOffset 不是一一对应的哦。由于数据的写入是各自追加。
在偏移量索引文件中,索引数据都是顺序记录 offset ,但时间戳索引文件中每一个追加的索引时间戳必须大于以前追加的索引项,不然不予追加。在 Kafka 0.11.0.0 之后,消息信息中存在若干的时间戳信息。若是 broker 端参数 log.message.timestamp.type 设置为 LogAppendTIme ,那么时间戳一定能保持单调增加。反之若是是 CreateTime 则没法保证顺序。
日志清理
日志清理,不是日志删除哦,这仍是有所区别的,日志删除会在下文进行说明。
Kafka 提供两种日志清理策略:
日志删除:按照必定的删除策略,将不知足条件的数据进行数据删除
日志压缩:针对每一个消息的 Key 进行整合,对于有相同 Key 的不一样 Value 值,只保留最后一个版本。
Kafka 提供 log.cleanup.policy 参数进行相应配置,默认值:delete,还能够选择 compact。
是否支持针对具体的 Topic 进行配置?
答案是确定的,主题级别的配置项是 cleanup.policy 。
日志删除
Kafka 会周期性根据相应规则进行日志数据删除,保留策略有 3 种:基于时间的保留策略、基于日志大小的保留策略和基于日志其实偏移量的保留策略。
基于时间
日志删除任务会根据 log.retention.hours/log.retention.minutes/log.retention.ms 设定日志保留的时间节点。若是超过该设定值,就须要进行删除。默认是 7 天,log.retention.ms 优先级最高。
如何查找日志分段文件中已通过去的数据呢?
Kafka 依据日志分段中最大的时间戳进行定位,首先要查询该日志分段所对应的时间戳索引文件,查找时间戳索引文件中最后一条索引项,若最后一条索引项的时间戳字段值大于 0,则取该值,不然取最近修改时间。
为何不直接选最近修改时间呢?
由于日志文件能够有意无心的被修改,并不能真实的反应日志分段的最大时间信息。
删除过程
若是活跃的日志分段中也存在须要删除的数据时?
Kafka 会先切分出一个新的日志分段做为活跃日志分段,而后执行删除操做。
基于日志大小
日志删除任务会检查当前日志的大小是否超过设定值。设定项为 log.retention.bytes ,单个日志分段的大小由 log.regment.bytes 进行设定。
删除过程
基于日志起始偏移量
基于日志起始偏移量的保留策略的判断依据是某日志分段的下一个日志分段的起始偏移量是否大于等于日志文件的起始偏移量,如果,则能够删除此日志分段。
注意:日志文件的起始偏移量并不必定等于第一个日志分段的基准偏移量,存在数据删除,可能与之相等的那条数据已经被删除了。
删除过程