消息生产与消息存储——kafka源码探究之三

消息存储结构

kafka每一个topic有多个partition,单个partition内消息有序。Partition在物理存储上由多个segment组成,每一个segment内包含两个文件,index文件和log文件。
物理实体 index文件和log文件
逻辑实体 topic > partition > segment算法

存储结构

1.partition存储
在kafka文件存储中,同一个Topic下有多个不一样的partition,每一个partition为一个目录,partition命名规则为topic名称+有序序号,第一个partition序号从0开始,序号最大值为partition数量减1。
每一个partition(目录)至关于一个大型文件被平均分配到大小(可配置log.segment.bytes)相等的segment数据文件中,但每一个segment file消息数量不必定相等,取决于消息大小,方便快速删除。
2.segment存储
Segment file由两个部分组成,分别是index file和data file,一一对应,成对出现,后缀为.index和.log,分别对应索引文件和数据文件。Segment的文件命名第一个从0开始,后续每一个segment文件名为上一个segment文件最后一条消息的offset值。segmentfault

clipboard.png

.index文件是索引文件,每行数据包括两个值,第几条消息+该消息在log文件的物理偏移量。.log文件存储消息的实际数据,每行由offset+message组成。具体以下图所示:缓存

clipboard.png

message参数说明:负载均衡

关键字 解释说明
8 byte offset 在parition(分区)内的每条消息都有一个有序的id号,这个id号被称为偏移(offset),它能够惟一肯定每条消息在parition(分区)内的位置。即offset表示partiion的第多少message
4 byte message size message大小
4 byte CRC32 用crc32校验message
1 byte “magic" 表示本次发布Kafka服务程序协议版本号
1 byte “attributes" 表示为独立版本、或标识压缩类型、或编码类型。
4 byte key length 表示key的长度,当key为-1时,K byte key字段不填
Kbyte key 可选
value bytes payload 表示实际消息数据。异步

经过offset查找message

以上图中查找offset=36876的message为例,须要经过如下两个步骤:
第一步查找segment file,其中00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0。第二个文件00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1。一样,第三个文件00000000000000737337.index的起始偏移量为737338=737337 + 1,其余后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset 二分查找文件列表,就能够快速定位到具体文件。当offset=368776时定位到00000000000000368769.index|logasync

第二步经过segment file查找message,经过第一步定位到segment file,当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置和00000000000000368769.log的物理偏移地址,而后再经过00000000000000368769.log顺序查找直到offset=368776为止。编码

消息生产

基本流程

生产者可与任一broker链接(生产者不会与zookeeper通讯),得到topic的partition信息(每一个broker都有全部topic信息),找到每一个partition的leader所在broker,再与该broker创建链接。发送消息时,经过轮询或者随机选取partition的方式,决定消息被发送到哪个partition。
kafka的消息发送包括同步和异步两种方式。同步发送可配置acks参数,该参数可配置消息的确认级别。当acks=-1,则要求全部ISR中的replica都肯定拿到消息后再返回给生产者成功(leader会先将消息落盘,ISR中的replica拿到后不必定落盘,到内存就算成功);acks=0则直接返回成功(不用leader确认);acks=1,则leader把消息落盘后再返回。异步发送则直接返回发送成功,由后台线程扫描队列长度,达到必定长度或者配置时间再批量发送消息到leader。spa

基本流程以下:

a. 建立topic时会往zk注册topic的分区信息
b. 生产者从broker获取topic的全部分区
c. 根据必定的负载均衡算法决定将消息发往哪一个分区
d. 最终根据分区所在的leader broker将消息发送到broker
e. 当topic分区变化时,生产者会从新从broker获取新的分区信息
Kafka的消息生产者使用Producer.scala,客户端经过producer.type配置可以使用sync和async两种模式。客户端调用Producer.send发送消息。
在同步模式下,首先调用DefaultEventHandler.handle方法,序列化消息,序列化方式是默认的Encoder,可自定义实现(producer配置serializer.class),以后在最大重试次数(默认三次)内尝试发送消息,调用dispatchSerializedData方法,在该方法内选择消息的partition。线程

partition选择

若是消息没有key,且是该客户端对应topic下首条消息,则随机选择一个partition,并缓存对应的partition和topic的关系到sendPartitionPerTopicCache,以后该topic下没有key的消息都将发往该分区。sendPartitionPerTopicCache将在对应的配置时间(topic.metadata.refresh.interval.ms,默认为600000)内clear,防止全部消息都发往同一个partition。
若是消息key不为空,则调用默认的分区方法DefaultPartitioner.partition。key hash以后的值再对分区值取模,获得消息对应的分区。可自行实现Partitioner接口,实现自定义的分区策略(producer新增配置partitioner.class)。scala

消息如何不丢

消息到达broker后,leader先将该消息落盘。再根据acks参数决定是否返回消息写入成功,若是acks=-1,则需等待ISR中的replica复制消息,所有复制完成后再返回成功,若是等待时间超时,则返回消息发送失败。
若是要严格保证消息不丢失,可给该topic配置两个以上replica,同时生产者的acks设置为-1,每条消息都要求副本确认复制后再返回成功。

消息发送流程图以下:

图片描述

建立topic——kafka源码探究之一
https://segmentfault.com/a/11...

broker的高可用及高伸缩——kafka源码探究之二
https://segmentfault.com/a/11...

相关文章
相关标签/搜索