阿里太注重原理了:阿里问kafka如何实现高并发存储-如何找到一条须要消费的数据,kafka用了稀疏索引的方式,使用了二分查找法,其实不少索引都是二分查找法html
二分查找法的时间复杂度:O(logn) redis,kafka,B+树的底层都采用了二分查找法 java
参考:二分查找法 redis的索引底层的 跳表原理 实现 聊聊Mysql索引和redis跳表 ---redis的跳表原理 时间复杂度O(logn)(阿里) mysql
参考:二分查找法 mysql索引原理:一步步分析为何B+树适合做为索引的结构 以及索引原理 (阿里面试)linux
参考:二分查找法:各类排序算法的时间复杂度和空间复杂度(阿里)git
这是答案:github
例如读取offset=368776的message,须要经过下面2个步骤查找。面试
第一步查找segment fileredis
上述图2为例,其中00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0.第二个文件00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1.一样,第三个文件00000000000000737337.index的起始偏移量为737338=737337 + 1,其余后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset **二分查找**文件列表,就能够快速定位到具体文件。算法
当offset=368776时定位到00000000000000368769.index|logsql
第二步经过segment file查找message
经过第一步定位到segment file,当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置和 00000000000000368769.log的物理偏移地址,而后再经过00000000000000368769.log顺序查找直到 offset=368776为止。
从上述图3可知这样作的优势,segment index file采起稀疏索引存储方式,它减小索引文件大小,经过mmap能够直接内存操做,稀疏索引为数据文件的每一个对应message设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来须要消耗更多的时间。
具体参考:
Kafka 社区很是活跃,从 0.9 版本开始,Kafka 的标语已经从“一个高吞吐量,分布式的消息系统”改成"一个分布式流平台"。
Kafka 和传统的消息系统不一样在于:
Kafka 和其余消息队列的对比:
Kafka 架构原理
对于 Kafka 的架构原理,咱们先提出以下几个问题:
Kafka 架构图
Kafka 名词解释
在一套 Kafka 架构中有多个 Producer,多个 Broker,多个 Consumer,每一个 Producer 能够对应多个 Topic,每一个 Consumer 只能对应一个 Consumer Group。
整个 Kafka 架构对应一个 ZK 集群,经过 ZK 管理集群配置,选举 Leader,以及在 Consumer Group 发生变化时进行 Rebalance。
Topic 和 Partition
在 Kafka 中的每一条消息都有一个 Topic。通常来讲在咱们应用中产生不一样类型的数据,均可以设置不一样的主题。
一个主题通常会有多个消息的订阅者,当生产者发布消息到某个主题时,订阅了这个主题的消费者均可以接收到生产者写入的新消息。
Kafka 为每一个主题维护了分布式的分区(Partition)日志文件,每一个 Partition 在 Kafka 存储层面是 Append Log。
任何发布到此 Partition 的消息都会被追加到 Log 文件的尾部,在分区中的每条消息都会按照时间顺序分配到一个单调递增的顺序编号,也就是咱们的 Offset。Offset 是一个 Long 型的数字。
咱们经过这个 Offset 能够肯定一条在该 Partition 下的惟一消息。在 Partition 下面是保证了有序性,可是在 Topic 下面没有保证有序性。
在上图中咱们的生产者会决定发送到哪一个 Partition:
若是没有 Key 值则进行轮询发送。
若是有 Key 值,对 Key 值进行 Hash,而后对分区数量取余,保证了同一个 Key 值的会被路由到同一个分区;若是想队列的强顺序一致性,可让全部的消息都设置为同一个 Key。
消费模型
消息由生产者发送到 Kafka 集群后,会被消费者消费。通常来讲咱们的消费模型有两种:
基于推送模型的消息系统,由消息代理记录消费状态。消息代理将消息推送到消费者后,标记这条消息为已经被消费,可是这种方式没法很好地保证消费的处理语义。
好比当咱们已经把消息发送给消费者以后,因为消费进程挂掉或者因为网络缘由没有收到这条消息,若是咱们在消费代理将其标记为已消费,这个消息就***丢失了。
若是咱们利用生产者收到消息后回复这种方法,消息代理须要记录消费状态,这种不可取。
若是采用 Push,消息消费的速率就彻底由消费代理控制,一旦消费者发生阻塞,就会出现问题。
Kafka 采起拉取模型(Poll),由本身控制消费速度,以及消费的进度,消费者能够按照任意的偏移量进行消费。
好比消费者能够消费已经消费过的消息进行从新处理,或者消费最近的消息等等。
网络模型
Kafka Client:单线程 Selector
单线程模式适用于并发连接数小,逻辑简单,数据量小的状况。在 Kafka 中,Consumer 和 Producer 都是使用的上面的单线程模式。
这种模式不适合 Kafka 的服务端,在服务端中请求处理过程比较复杂,会形成线程阻塞,一旦出现后续请求就会没法处理,会形成大量请求超时,引发雪崩。而在服务器中应该充分利用多线程来处理执行逻辑。
Kafka Server:多线程 Selector
在 Kafka 服务端采用的是多线程的 Selector 模型,Acceptor 运行在一个单独的线程中,对于读取操做的线程池中的线程都会在 Selector 注册 Read 事件,负责服务端读取请求的逻辑。
成功读取后,将请求放入 Message Queue共享队列中。而后在写线程池中,取出这个请求,对其进行逻辑处理。
这样,即便某个请求线程阻塞了,还有后续的线程从消息队列中获取请求并进行处理,在写线程中处理完逻辑处理,因为注册了 OP_WIRTE 事件,因此还须要对其发送响应。
高可靠分布式存储模型
在 Kafka 中保证高可靠模型依靠的是副本机制,有了副本机制以后,就算机器宕机也不会发生数据丢失。
高性能的日志存储 kafka采用了稀疏索引的方式
Kafka 一个 Topic 下面的全部消息都是以 Partition 的方式分布式的存储在多个节点上。
同时在 Kafka 的机器上,每一个 Partition 其实都会对应一个日志目录,在目录下面会对应多个日志分段(LogSegment)。
LogSegment 文件由两部分组成,分别为“.index”文件和“.log”文件,分别表示为 Segment 索引文件和数据文件。
先经过index文件,利用二分查找法,找到相应的稀疏索引,而后跟进index上的偏移量,找到log文件的位置,而后在log顺序遍历上面找到相应的文件;
例如读取offset=368776的message,须要经过下面2个步骤查找。
第一步查找segment file
上述图2为例,其中00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0.第二个文件00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1.一样,第三个文件00000000000000737337.index的起始偏移量为737338=737337 + 1,其余后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset **二分查找**文件列表,就能够快速定位到具体文件。
当offset=368776时定位到00000000000000368769.index|log
第二步经过segment file查找message
经过第一步定位到segment file,当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置和 00000000000000368769.log的物理偏移地址,而后再经过00000000000000368769.log顺序查找直到 offset=368776为止。
从上述图3可知这样作的优势,segment index file采起稀疏索引存储方式,它减小索引文件大小,经过mmap能够直接内存操做,稀疏索引为数据文件的每一个对应message设置一个元数据指针,它比稠密索引节省了更多的存储空间,但查找起来须要消耗更多的时间。
Kafka部分名词解释以下:
Broker:消息中间件处理结点,一个Kafka节点就是一个broker,多个broker能够组成一个Kafka集群。
Topic:一类消息,例如page view日志、click日志等均可以以topic的形式存在,Kafka集群可以同时负责多个topic的分发。
Partition:topic物理上的分组,一个topic能够分为多个partition,每一个partition是一个有序的队列。
Segment:partition物理上由多个segment组成,下面2.2和2.3有详细说明。
offset:每一个partition都由一系列有序的、不可变的消息组成,这些消息被连续的追加到partition中。partition中的每一个消息都有一个连续的序列号叫作offset,用于partition惟一标识一条消息.
分析过程分为如下4个步骤:
topic中partition存储分布
partiton中文件存储方式
partiton中segment文件存储结构
在partition中如何经过offset查找message
经过上述4过程详细分析,咱们就能够清楚认识到kafka文件存储机制的奥秘。
假设实验环境中Kafka集群只有一个broker,xxx/message-folder为数据文件存储根目录,在Kafka broker中server.properties文件配置(参数log.dirs=xxx/message-folder),例如建立2个topic名称分别为report_push、launch_info, partitions数量都为partitions=4(将一个topic分为4个部分存储)
存储路径和目录规则为:
xxx/message-folder
|--report_push-0
|--report_push-1
|--report_push-2
|--report_push-3
|--launch_info-0
|--launch_info-1
|--launch_info-2
|--launch_info-3
在Kafka文件存储中,同一个topic下有多个不一样partition,每一个partition为一个目录,partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始,序号最大值为partitions数量减1。
若是是多broker分布状况,请参考文末kafka集群partition分布原理分析
下面示意图形象说明了partition中文件存储方式:
图1
每一个partion(目录)至关于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。但每一个段segment file消息数量不必定相等,这种特性方便old segment file快速被删除。
每一个partiton只须要支持顺序读写就好了,segment文件生命周期由服务端配置参数决定。
这样作的好处就是能快速删除无用文件,有效提升磁盘利用率。
读者从2.2节了解到Kafka文件系统partition存储方式,本节深刻分析partion中segment file组成和物理结构。
segment file组成:由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀".index"和“.log”分别表示为segment索引文件、数据文件.
segment文件命名规则:partion全局的第一个segment从0开始,后续每一个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。
下面文件列表是笔者在Kafka broker上作的一个实验,建立一个topicXXX包含1 partition,设置每一个segment大小为500MB,并启动producer向Kafka broker写入大量数据,以下图2所示segment文件列表形象说明了上述2个规则:
以上述图2中一对segment file文件为例,说明segment中index<—->data file对应关系物理结构以下:
上述图3中索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中message的物理偏移地址。
其中以索引文件中元数据3,497为例,依次在数据文件中表示第3个message(在全局partiton表示第368772个message)、以及该消息的物理偏移地址为497。
从上述图3了解到segment data file由许多message组成,下面详细说明message物理结构以下:
图4
参数说明:
关键字 | 解释说明 |
---|---|
8 byte offset | 在parition(分区)内的每条消息都有一个有序的id号,这个id号被称为偏移(offset),它能够惟一肯定每条消息在parition(分区)内的位置。即offset表示partiion的第多少message |
4 byte message size | message大小 |
4 byte CRC32 | 用crc32校验message |
1 byte “magic" | 表示本次发布Kafka服务程序协议版本号 |
1 byte “attributes" | 表示为独立版本、或标识压缩类型、或编码类型。 |
4 byte key length | 表示key的长度,当key为-1时,K byte key字段不填 |
K byte key | 可选 |
value bytes payload | 表示实际消息数据。 |
实验环境:
Kafka集群:由2台虚拟机组成
cpu:4核
物理内存:8GB
网卡:千兆网卡
jvm heap: 4GB
详细Kafka服务端配置及其优化请参考:kafka server.properties配置详解
图5
从上述图5能够看出,Kafka运行时不多有大量读磁盘的操做,主要是按期批量写磁盘操做,所以操做磁盘很高效。这跟Kafka文件存储中读写message的设计是息息相关的。Kafka中读写message有以下特色:
写message
消息从java堆转入page cache(即物理内存)。
由异步线程刷盘,消息从page cache刷入磁盘。
读message
消息直接从page cache转入socket发送出去。
当从page cache没有找到相应数据时,此时会产生磁盘IO,从磁
盘Load消息到page cache,而后直接从socket发出去
Kafka高效文件存储设计特色
Kafka把topic中一个parition大文件分红多个小文件段,经过多个小文件段,就容易按期清除或删除已经消费完文件,减小磁盘占用。
经过索引信息能够快速定位message和肯定response的最大大小。
经过index元数据所有映射到memory,能够避免segment file的IO磁盘操做。
经过索引文件稀疏存储,能够大幅下降index文件元数据占用空间大小。
topic------->多个partiton----------->1个partion多个segment----------->1个segment多个index和log
在只有一个broker的时候,多个partion位于这个broker,有多个broker的时候是按照必定的算法分布在多个broker上。
说到分区,就要说kafka对消息的存储.在官方文档中.
分区读写日志图
首先,kafka是经过log(日志)来记录消息发布的.每当产生一个消息,kafka会记录到本地的log文件中,这个log和咱们平时的log有必定的区别.这里能够参考一下The Log,很少解释.
这个log文件默认的位置在config/server.properties中指定的.默认的位置是log.dirs=/tmp/kafka-logs,linux不用说,windows的话就在你对应磁盘的根目录下.我这里是D盘.
分区partition
kafka是为分布式环境设计的,所以若是日志文件,其实也能够理解成消息数据库,放在同一个地方,那么必然会带来可用性的降低,一挂全挂,若是全量拷贝到全部的机器上,那么数据又存在过多的冗余,并且因为每台机器的磁盘大小是有限的,因此即便有再多的机器,可处理的消息仍是被磁盘所限制,没法超越当前磁盘大小.所以有了partition的概念.
kafka对消息进行必定的计算,经过hash来进行分区.这样,就把一份log文件分红了多份.如上面的分区读写日志图,分红多份之后,在单台broker上,好比快速上手中,若是新建topic的时候,咱们选择了--replication-factor 1 --partitions 2,那么在log目录里,咱们会看到
test-0目录和test-1目录.就是两个分区了.
你可能会想,这特么没啥区别呀.注意,当有了多个broker以后,这个意义就存在了.这里上一张图,原文在参考连接里有
这是一个topic包含4个Partition,2 Replication(拷贝),也就是说所有的消息被放在了4个分区存储,为了高可用,将4个分区作了2份冗余,而后根据分配算法.将总共8份数据,分配到broker集群上.
结果就是每一个broker上存储的数据比全量数据要少,但每份数据都有冗余,这样,一旦一台机器宕机,并不影响使用.好比图中的Broker1,宕机了.那么剩下的三台broker依然保留了全量的分区数据.因此还能使用,若是再宕机一台,那么数据不完整了.固然你能够设置更多的冗余,好比设置了冗余是4,那么每台机器就有了0123完整的数据,宕机几台都行.须要在存储占用和高可用之间作衡量.
至于宕机后,zookeeper会选出新的partition leader.来提供服务.这个等下篇文章
偏移offset
上一段说了分区,分区就是一个有序的,不可变的消息队列.新来的commit log持续日后面加数据.这些消息被分配了一个下标(或者偏移),就是offset,用来定位这一条消息.
消费者消费到了哪条消息,是保持在消费者这一端的.消息者也能够控制,消费者能够在本地保存最后消息的offset,并间歇性的向zookeeper注册offset.也能够重置offset
如何经过offset算出分区
其实partition存储的时候,又分红了多个segment(段),而后经过一个index,索引,来标识第几段.这里先能够去看一下本地log目录的分区文件夹.
在我这里,test-0,这个分区里面,会有一个index文件和一个log文件,
index和log
对于某个指定的分区,假设每5个消息,做为一个段大小,当产生了10条消息的状况想,目前有会获得(只是解释)
0.index (表示这里index是对0-4作的索引)
5.index (表示这里index是对5-9作的索引)
10.index (表示这里index是对10-15作的索引,目前还没满)
和
0.log
5.log
10.log
,当消费者须要读取offset=8的时候,首先kafka对index文件列表进行二分查找,能够算出.应该是在5.index对应的log文件中,而后对对应的5.log文件,进行顺序查找,5->6->7->8,直到顺序找到8就行了.
以上是Kafka文件存储机制及partition和offset的所有内容,在云栖社区的博客、问答、云栖号、人物、课程等栏目也有Kafka文件存储机制及partition和offset的相关内容,欢迎继续使用右上角搜索按钮进行搜索存储 , 文件 , 数据 , 索引 , 磁盘 物理 kafka offset保存机制、kafka offset 存储、kafka partition、kafka partition 设置、kafka partition 数量,以便于您获取更多的相关知识。
副本机制
Kafka 的副本机制是多个服务端节点对其余节点的主题分区的日志进行复制。
当集群中的某个节点出现故障,访问故障节点的请求会被转移到其余正常节点(这一过程一般叫 Reblance)。
Kafka 每一个主题的每一个分区都有一个主副本以及 0 个或者多个副本,副本保持和主副本的数据同步,当主副本出故障时就会被替代。
在 Kafka 中并非全部的副本都能被拿来替代主副本,因此在 Kafka 的 Leader 节点中维护着一个 ISR(In Sync Replicas)集合。
翻译过来也叫正在同步中集合,在这个集合中的须要知足两个条件:
另外还有个 AR(Assigned Replicas)用来标识副本的全集,OSR 用来表示因为落后被剔除的副本集合。
因此公式以下:ISR = Leader + 没有落后太多的副本;AR = OSR+ ISR。
这里先要说下两个名词:HW(高水位)是 Consumer 可以看到的此 Partition 的位置,LEO 是每一个 Partition 的 Log ***一条 Message 的位置。
HW 能保证 Leader 所在的 Broker 失效,该消息仍然能够重新选举的 Leader 中获取,不会形成消息丢失。
当 Producer 向 Leader 发送数据时,能够经过 request.required.acks 参数来设置数据可靠性的级别:
可是这样也不能保证数据不丢失,好比当 ISR 中只有 Leader 时(其余节点都和 ZK 断开链接,或者都没追上),这样就变成了 acks = 1 的状况。
高可用模型及幂等
在分布式系统中通常有三种处理语义:
at-least-once
至少一次,有可能会有屡次。若是 Producer 收到来自 Ack 的确认,则表示该消息已经写入到 Kafka 了,此时恰好是一次,也就是咱们后面的 Exactly-once。
可是若是 Producer 超时或收到错误,而且 request.required.acks 配置的不是 -1,则会重试发送消息,客户端会认为该消息未写入 Kafka。
若是 Broker 在发送 Ack 以前失败,但在消息成功写入 Kafka 以后,这一次重试将会致使咱们的消息会被写入两次。
因此消息就不止一次地传递给最终 Consumer,若是 Consumer 处理逻辑没有保证幂等的话就会获得不正确的结果。
在这种语义中会出现乱序,也就是当***次 Ack 失败准备重试的时候,可是第二消息已经发送过去了,这个时候会出现单分区中乱序的现象。
咱们须要设置 Prouducer 的参数 max.in.flight.requests.per.connection,flight.requests 是 Producer 端用来保存发送请求且没有响应的队列,保证 Produce r端未响应的请求个数为 1。
at-most-once
若是在 Ack 超时或返回错误时 Producer 不重试,也就是咱们讲 request.required.acks = -1,则该消息可能最终没有写入 Kafka,因此 Consumer 不会接收消息。
exactly-once
恰好一次,即便 Producer 重试发送消息,消息也会保证最多一次地传递给 Consumer。该语义是最理想的,也是最难实现的。
在 0.10 以前并不能保证 exactly-once,须要使用 Consumer 自带的幂等性保证。0.11.0 使用事务保证了。
如何实现 exactly-once
要实现 exactly-once 在 Kafka 0.11.0 中有两个官方策略:
单 Producer 单 Topic
每一个 Producer 在初始化的时候都会被分配一个惟一的 PID,对于每一个惟一的 PID,Producer 向指定的 Topic 中某个特定的 Partition 发送的消息都会携带一个从 0 单调递增的 Sequence Number。
在咱们的 Broker 端也会维护一个维度为,每次提交一次消息的时候都会对齐进行校验:
上面所说的解决了两个问题:
上面所说的都是在同一个 PID 下面,意味着必须保证在单个 Producer 中的同一个 Seesion 内,若是 Producer 挂了,被分配了新的 PID,这样就没法保证了,因此 Kafka 中又有事务机制去保证。
事务
在 Kafka 中事务的做用是:
事务能够保证就算跨多个,在本次事务中的对消费队列的操做都当成原子性,要么所有成功,要么所有失败。
而且,有状态的应用也能够保证重启后从断点处继续处理,也即事务恢复。
在 Kafka 的事务中,应用程序必须提供一个惟一的事务 ID,即 Transaction ID,而且宕机重启以后,也不会发生改变。
Transactin ID 与 PID 可能一一对应,区别在于 Transaction ID 由用户提供,而 PID 是内部的实现对用户透明。
为了 Producer 重启以后,旧的 Producer 具备相同的 Transaction ID 失效,每次 Producer 经过 Transaction ID 拿到 PID 的同时,还会获取一个单调递增的 Epoch。
因为旧的 Producer 的 Epoch 比新 Producer 的 Epoch 小,Kafka 能够很容易识别出该 Producer 是老的,Producer 并拒绝其请求。
为了实现这一点,Kafka 0.11.0.0 引入了一个服务器端的模块,名为 Transaction Coordinator,用于管理 Producer 发送的消息的事务性。
该 Transaction Coordinator 维护 Transaction Log,该 Log 存于一个内部的 Topic 内。
因为 Topic 数据具备持久性,所以事务的状态也具备持久性。Producer 并不直接读写 Transaction Log,它与 Transaction Coordinator 通讯,而后由 Transaction Coordinator 将该事务的状态插入相应的 Transaction Log。
Transaction Log 的设计与 Offset Log 用于保存 Consumer 的 Offset 相似。
***
关于消息队列或者 Kafka 的一些常见的面试题,经过上面的文章能够提炼出如下几个比较经典的问题,大部分问题均可以从上面总结后找到答案:
A two server Kafka cluster hosting four partitions (P0-P3) with two consumer groups. Consumer group A has two consumer instances and group B has four.
如上图,一个Topic有四个Partition,每一个Partition两个replication。
Zookeeper在Kakfa中扮演的角色Kafka将元数据信息保存在Zookeeper中,可是发送给Topic自己的数据是不会发到Zk上的,不然Zk就疯了。
[zk: localhost:2181(CONNECTED) 0] ls / [admin, consumers, config, brokers]
参考:kafka工做原理