Kafka为何快到根本停不下来?


目前来讲市面上能够选择的消息队列很是多,像 ActiveMQ,RabbitMQ,ZeroMQ 已经被大多数人耳熟能详。算法


image.png
图片来自 Pexels


特别像 ActiveMQ 早期应用在企业中的总线通讯,基本做为企业级 IT 设施解决方案中不可或缺的一部分。数据库


目前 Kafka 已经很是稳定,而且逐步应用更加普遍,已经算不得新生事物,可是不能否认 Kafka 一枝独秀如同雨后春笋,很是耀眼,今天咱们仔细分解一下 Kafka,了解一下它的内幕。


如下的内容版本基于当前最新的 Kafka 稳定版本 2.4.0。文章主要包含如下内容:
  • Kafka 为何快缓存

  • Kafka 为何稳安全

  • Kafka 该怎么用服务器


该文章为开篇引导之作,后续会有对应的 HBase,Spark,Kylin,Pulsar 等相关组件的剖析。

Kafka 为何快网络


快是一个相对概念,没有对比就没有伤害,所以一般咱们说 Kafka 是相对于咱们常见的 ActiveMQ,RabbitMQ 这类会发生 IO,而且主要依托于 IO 来作信息传递的消息队列。


像 ZeroMQ 这种基本纯粹依靠内存作信息流传递的消息队列,固然会更快,可是此类消息队列只有特殊场景下会使用,不在对比之列。


所以当咱们说 Kakfa 快的时候,一般是基于如下场景:
  • 吞吐量:当咱们须要每秒处理几十万上百万 Message 的时候,相对其余 MQ,Kafka 处理的更快。架构

  • 高并发:当具备百万以及千万的 Consumer 的时候,同等配置的机器下,Kafka 所拥有的 Producer 和 Consumer 会更多。并发

  • 磁盘锁:相对其余 MQ,Kafka 在进行 IO 操做的时候,其同步锁住 IO 的场景更少,发生等待的时间更短。app

那么基于以上几点,咱们来仔细探讨一下,为何 Kafka 就快了。框架

消息队列的推拉模型

首先,若是咱们单纯站在 Consumer 的角度来看“Kafka 快”,是一个伪命题,由于相比其余 MQ,Kafka 从 Producer 产生一条 Message 到 Consumer 消费这条 Message 来看,它的时间必定是大于等于其余 MQ 的。


背后的缘由涉及到消息队列设计的两种模型:
  • 推模型

  • 拉模型


以下图所示:

image.png


对于拉模型来讲,Producer 产生 Message 后,会主动发送给 MQ Server,为了提高性能和减小开支,部分 Client 还会设计成批量发送。


可是不管是单条仍是批量,Producer 都会主动推送消息到 MQ Server。

当 MQ Server 接收到消息后,对于拉模型,MQ Server 不会主动发送消息到 Consumer,同时也不会维持和记录消息的 Offset,Consumer 会自动设置定时器到服务端去询问是否有新的消息产生。

一般时间是不超过 100ms 询问一次,一旦产生新的消息则会同步到本地,而且修改和记录 Offset,服务端能够辅助存储 Offset,可是不会主动记录和校验 Offset 的合理性。

同时 Consumer 能够彻底自主的维护 offset 以便实现自定义的信息读取。

对于推模型来讲,服务端收到 Message 后,首先会记录消息的信息,而且从本身的元信息数据库中查询对应的消息的 Consumer 有谁。
因为服务器和 Consumer 在连接的时候创建了长连接,所以能够直接发送消息到 Consumer。
Kafka 是基于拉模型的消息队列,所以从 Consumer 获取消息的角度来讲,延迟会小于等于轮询的周期,因此会比推模型的消息队列具备更高的消息获取延迟,可是推模型一样又其问题。




首先,因为服务器须要记录对应的 Consumer 的元信息,包括消息该发给谁,Offset 是多少,同时须要向 Consumer 推送消息,必然会带来系列的问题。




假如这一刻网络很差,Consumer 没有收到,消息没有发成功怎么办?假设消息发出去了,我怎么知道它有没有收到?




所以服务器和 Consumer 之间须要首先多层确认口令,以达到至少消费一次,仅且消费一次等特性。




Kafka 此类的拉模型将这一块功能都交由 Consumer 自动维护,所以服务器减小了更多的没必要要的开支。




所以从同等资源的角度来说,Kafka 具有连接的 Producer 和 Consumer 将会更多,极大的下降了消息堵塞的状况,所以看起来更快了。




OS Page Cache 和 Buffer Cache




太阳底下无新鲜事,对于一个框架来讲,要想运行的更快,一般能用的手段也就那么几招,Kafka 在将这一招用到了极致。




其中之一就是极大化的使用了 OS 的 Cache,主要是 Page Cache 和 Buffer Cache。


对于这两个 Cache,使用 Linux 的同窗一般不会陌生,例如咱们在 Linux 下执行 free 命令的时候会看到以下的输出:

image.png


图片来自网络




会有两列名为 buffers 和 cached,也有一行名为“-/+ buffers/cache”, 这两个信息的具体解释以下:
pagecache: 文件系统层级的缓存,从磁盘里读取的内容是存储到这里,这样程序读取磁盘内容就会很是快,好比使用 Linux 的 grep 和 find 等命令查找内容和文件时,第一次会慢不少,再次执行就快好多倍,几乎是瞬间。
另外 page cache 的数据被修改事后,也即脏数据,等到写入磁盘时机到来时,会转移到 buffer cache 而不是直接写入到磁盘。




咱们看到的 cached 这列的数值表示的是当前的页缓存(page cache)的占用量,page cache 文件的页数据,页是逻辑上的概念,所以 page cache 是与文件系统同级的。




buffer cache: 磁盘等块设备的缓冲,内存的这一部分是要写入到磁盘里的 。



buffers 列表示当前的块缓存(buffer cache)占用量,buffer cache 用于缓存块设备(如磁盘)的块数据。块是物理上的概念,所以 buffer cache 是与块设备驱动程序同级的。

image.png


二者都是用来加速数据 IO,将写入的页标记为 dirty,而后向外部存储 flush,读数据时首先读取缓存,若是未命中,再去外部存储读取,而且将读取来的数据也加入缓存。




操做系统老是积极地将全部空闲内存都用做 Page Cache 和 Buffer Cache,当 OS 的内存不够用时也会用 LRU 等算法淘汰缓存页。

image.png



有了以上概念后,咱们再看来 Kafka 是怎么利用这个特性的。
首先,对于一次数据 IO 来讲,一般会发生如下的流程:


  • 操做系统将数据从磁盘拷贝到内核区的 Page Cache。

  • 用户程序将内核区的 Page Cache 拷贝到用户区缓存。

  • 用户程序将用户区的缓存拷贝到 Socket 缓存中。

  • 操做系统将 Socket 缓存中的数据拷贝到网卡的 Buffer 上,发送数据。



能够发现一次 IO 请求操做进行了 2 次上下文切换和 4 次系统调用,而同一份数据在缓存中屡次拷贝,实际上对于拷贝来讲彻底能够直接在内核态中进行。



也就是省去第二和第三步骤,变成这样:

image.png


正由于能够如此的修改数据的流程,因而 Kafka 在设计之初就参考此流程,尽量大的利用 OS 的 Page Cache 来对数据进行拷贝,尽可能减小对磁盘的操做。
若是 Kafka 生产消费配合的好,那么数据彻底走内存,这对集群的吞吐量提高是很大的。




早期的操做系统中的 Page Cache 和 Buffer Cache 是分开的两块 Cache,后来发现一样的数据可能会被 Cache 两次,因而大部分状况下二者都是合二为一的。




Kafka 虽然使用 JVM 语言编写,在运行的时候脱离不了 JVM 和 JVM 的 GC, 可是 Kafka 并未本身去管理缓存,而是直接使用了 OS 的 Page Cache 做为缓存。




这样作带来了如下好处:


  • JVM 中的一切皆对象,因此不管对象的大小,总会有些额外的 JVM 的对象元数据浪费空间。

  • JVM 本身的 GC 不受程序手动控制,因此若是使用 JVM 做为缓存,在遇到大对象或者频繁 GC 的时候会下降整个系统的吞吐量。

  • 程序异常退出或者重启,全部的缓存都将失效,在容灾架构下会影响快速恢复。而 Page Cache 由于是 OS 的 Cache,即使程序退出,缓存依旧存在。



因此 Kafka 优化 IO 流程,充分利用 Page Cache,其消耗的时间更短,吞吐量更高,相比其余 MQ 就更快了。



用一张图来简述三者之间的关系以下:

image.png


当 Producer 和 Consumer 速率相差不大的状况下,Kafka 几乎能够彻底实现不落盘就完成信息的传输。




追加顺序写入



除了前面的重要特性以外,Kafka 还有一个设计,就是对数据的持久化存储采用的顺序的追加写入,Kafka 在将消息落到各个 Topic 的 Partition 文件时,只是顺序追加,充分的利用了磁盘顺序访问快的特性。

image.png


图片来自网络




Kafka 的文件存储按照 Topic 下的 Partition 来进行存储,每个 Partition 有各自的序列文件,各个 Partition 的序列不共享, 主要的划分按照消息的 Key 进行 Hash 决定落在哪一个分区之上。




咱们先来详细解释一下 Kafka 的各个名词,以便充分理解其特色:


  • Broker:Kafka 中用来处理消息的服务器,也是 Kafka 集群的一个节点,多个节点造成一个 Kafka 集群。

  • Topic:一个消息主题,每个业务系统或者 Consumer 须要订阅一个或者多个主题来获取消息,Producer 须要明确发生消息对于的 Topic,等于信息传递的口令名称。

  • Partition:一个 Topic 会拆分红多个 Partition 落地到磁盘,在 Kafka 配置的存储目录下按照对应的分区 ID 建立的文件夹进行文件的存储,磁盘能够见的最大的存储单元。

  • Segment:一个 Partition 会有多个 Segment 文件来实际存储内容。

  • Offset:每个 Partition 有本身的独立的序列编号,做用域仅在当前的 Partition 之下,用来对对应的文件内容进行读取操做。

  • Leader:每个 Topic 须要有一个 Leader 来负责该 Topic 的信息的写入,数据一致性的维护。

  • Controller:每个 Kafka 集群会选择出一个 Broker 来充当 Controller,负责决策每个 Topic 的 Leader 是谁,监听集群 Broker 信息的变化,维持集群状态的健康。

image.png

image.png


能够看到最终落地到磁盘都是 Segment 文件,每个 Partion(目录)至关于一个巨型文件被平均分配到多个大小相等 Segment(段)数据文件中。
但每一个段 segment file 消息数量不必定相等,这种特性方便老的 segment file 快速被删除。




由于 Kafka 处理消息的力度是到 Partition,所以只须要保持好 Partition 对应的顺序处理,Segment 能够单独维护其状态。 



Segment 的文件由 index file 和 data file 组成,落地在磁盘的后缀为 .index 和 .log,文件按照序列编号生成,以下所示:

image.png


图片来自网络




其中 index 维持着数据的物理地址,而 data 存储着数据的偏移地址,相互关联,这里看起来彷佛和磁盘的顺序写入关系不大,想一想 HDFS 的块存储,每次申请固定大小的块和这里的 Segment?是否是挺类似的?




另外由于有 index 文的自己命名是以 Offset 做为文件名的,在进行查找的时候能够快速根据须要查找的 Offset 定位到对应的文件,再根据文件进行内容的检索。




所以 Kafka 的查找流程为先根据要查找的 Offset 对文件名称进行二分查找,找到对应的文件,再根据 index 的元数据的物理地址和 log 文件的偏移位置结合顺序读区到对应 Offset 的位置的内容便可。
segment index file 采起稀疏索引存储方式,它减小索引文件大小,经过 mmap 能够直接内存操做,稀疏索引为数据文件的每一个对应 Message 设置一个元数据指针。




它比稠密索引节省了更多的存储空间,但查找起来须要消耗更多的时间,特别是在随机读取的场景下,Kafka 很是不合适。因此由于 Kafka 特殊的存储设计,也让 Kafka 感受起来,更快。




Kafka 为何稳




前面提到 Kafka 为何快,除了快的特性以外,Kafka 还有其余特色,那就是:稳。




Kafka 的稳体如今几个维度:


  • 数据安全,几乎不会丢数据。

  • 集群安全,发生故障几乎能够 Consumer 无感知切换。

  • 可用性强,即使部分 Partition 不可用,剩余的 Partition 的数据依旧不影响读取。

  • 流控限制,避免大量 Consumer 拖垮服务器的带宽。



限流机制


对于 Kafka 的稳,一般是由其总体架构设计决定,不少优秀的特性结合在一块儿,就更加的优秀,像 Kafka 的 Qutota 就是其中一个。




既然是限流,那就意味着须要控制 Consumer 或者 Producer 的流量带宽,一般限制流量这件事须要在网卡上做处理,像常见的 N 路交换机或者高端路由器。




因此对于 Kafka 来讲,想要操控 OS 的网卡去控制流量显然具备很是高的难度,所以 Kafka 采用了另一个特别的思路。




即:没有办法控制网卡经过的流量大小,就控制返回数据的时间。对于 JVM 程序来讲,就是一个 Wait 或者 Seelp 的事情。




因此对于 Kafka 来讲,有一套特殊的时延计算规则,Kafka 按照一个窗口来统计单位时间传输的流量。




当流量大小超过设置的阈值的时候,触发流量控制,将当前请求丢入 Kafka 的 Qutota Manager,等到延迟时间到达后,再次返回数据。



咱们经过 Kafka 的 ClientQutotaManager 类中的方法来看:

image.png


这几行代码表明了 Kafka 的限流计算逻辑,大概的思路为:假设咱们设定当前流量上限不超过 T,根据窗口计算出当前的速率为 O。



若是 O 超过了 T,那么会进行限速,限速的公示为:


X = (O - T)/ T * W




X 为须要延迟的时间,让我举一个形象的例子,假设咱们限定流量不超过 10MB/s,过去 5 秒(公示中的 W,窗口区间)内经过的流量为 100MB,则延迟的时间为:(100-5*10)/10=5 秒。




这样就可以保障在下一个窗口运行完成后,整个流量的大小是不会超过限制的。



经过 KafkaApis 里面对 Producer 和 Consumer 的 call back 代码能够看到对限流的延迟返回:

image.png


对于 Kafka 的限流来说,默认是按照 client id 或者 user 来进行限流的,从实际使用的角度来讲,意义不是很大,基于 Topic 或者 Partition 分区级别的限流,相对使用场景更大。

竞选机制




Kafka 背后的元信息重度依赖 Zookeeper,再次咱们不解释 Zookeeper 自己,而是关注 Kafka 究竟是如何使用 ZK 的。



首先一张图解释 Kafka 对 ZK 的重度依赖:

image.png


图片来源于网络




利用 ZK 除了自己信息的存储以外,最重要的就是 Kafka 利用 ZK 实现选举机制,其中以 Controller 为主要的介绍。



首先 Controller 做为 Kafka 的心脏,主要负责着包括不限于如下重要事项:

image.png


也就是说 Controller 是 Kafka 的核心角色,对于 Controller 来讲,采用公平竞争,任何一个 Broker 都有可能成为 Controller,保障了集群的健壮性。




对于 Controller 来讲,其选举流程以下:




①先获取 ZK 的 /Cotroller 节点的信息,获取 Controller 的 broker id,若是该节点不存在(好比集群刚建立时),* 那么获取的 controller id 为 -1。




②若是 controller id 不为 -1,即 Controller 已经存在,直接结束流程。




③若是 controller id 为 -1,证实 Controller 还不存在,这时候当前 Broker 开始在 ZK 注册 Controller。




④若是注册成功,那么当前 Broker 就成为了 Controller,这时候开始调用 onBecomingLeader() 方法,正式初始化 Controller。




注意:Controller 节点是临时节点,若是当前 Controller 与 ZK 的 Session 断开,那么 Controller 的临时节点会消失,会触发 Controller 的从新选举。




⑤若是注册失败(恰好 Controller 被其余 Broker 建立了、抛出异常等),那么直接返回。



其代码直接经过 KafkaController 能够看到:

image.png

一旦 Controller 选举出来以后,则其余 Broker 会监听 ZK 的变化,来响应集群中 Controller 挂掉的状况:

image.png


从而触发新的 Controller 选举动做。对于 Kafka 来讲,整个设计很是紧凑,代码质量至关高,不少设计也很是具备借鉴意义,相似的功能在 Kafka 中有很是多的特性体现,这些特性结合一块儿,造成了 Kafka 整个稳定的局面。




Kafka 该怎么用




虽然 Kafka 总体看起来很是优秀,可是 Kafka 也不是全能的银弹,必然有其对应的短板,那么对于 Kafka 如何,或者如何能用的更好,则须要通过实际的实践才能得感悟的出。
通过概括和总结,可以发现如下不一样的使用场景和特色:



①Kafka 并不合适高频交易系统




Kafka 虽然具备很是高的吞吐量和性能,可是不能否认,Kafka 在单条消息的低延迟方面依旧不如传统 MQ,毕竟依托推模型的 MQ 可以在实时消息发送的场景下取得先天的优点。


②Kafka 并不具有完善的事务机制




0.11 以后 Kafka 新增了事务机制,能够保障 Producer 的批量提交,为了保障不会读取到脏数据,Consumer 能够经过对消息状态的过滤过滤掉不合适的数据,可是依旧保留了读取全部数据的操做。




即使如此,Kafka 的事务机制依旧不完备,背后主要的缘由是 Kafka 对 Client 并不感冒,因此不会统一全部的通用协议,所以在相似仅且被消费一次等场景下,效果很是依赖于客户端的实现。


③Kafka 的异地容灾方案很是复杂




对于 Kafka 来讲,若是要实现跨机房的无感知切换,就须要支持跨集群的代理。




由于 Kafka 特殊的 append log 的设计机制,致使一样的 Offset 在不一样的 Broker 和不一样的内容上没法复用。




也就是文件一旦被拷贝到另一台服务器上,将不可读取,相比相似基于数据库的 MQ,很难实现数据的跨集群同步。




同时对于 Offset 的复现也很是难,曾经帮助客户实现了一套跨机房的 Kafka 集群 Proxy,投入了很是大的成本。


④Kafka Controller 架构没法充分利用集群资源




Kafka Controller 相似于 ES 的去中心化思想,按照竞选规则从集群中选择一台服务器做为 Controller。


意味着改服务器即承担着 Controller 的职责,同时又承担着 Broker 的职责,致使在海量消息的压迫下,该服务器的资源很容易成为集群的瓶颈,致使集群资源没法最大化。




Controller 虽然支持 HA 可是并不支持分布式,也就意味着若是要想 Kafka 的性能最优,每一台服务器至少都须要达到最高配置。


⑤Kafka 不具有很是智能的分区均衡能力




一般在设计落地存储的时候,对于热点或者要求性能足够高的场景下,会是 SSD 和 HD 的结合。



同时若是集群存在磁盘容量大小不均等的状况,对于 Kafka 来讲会有很是严重的问题,Kafka 的分区产生是按照 Paratition 的个数进行统计,将新的分区建立在个数最少的磁盘上,见下图: image.png
曾经我帮助某企业修改了分区建立规则,考虑了容量的状况,也就是按照磁盘容量进行分区的选择。




紧接着带来第二个问题:容量大的磁盘具有更多的分区,则会致使大量的 IO 都压向该盘,最后问题又落回 IO,会影响该磁盘的其余 Topic 的性能。




因此在考虑 MQ 系统的时候,须要合理的手动设置 Kafka 的分区规则。

结尾




Kafka 并非惟一的解决方案,像几年前新生势头挺厉害的 Pulsar,以取代 Kafka 的口号冲入市场,也许会成为下一个解决 Kafka 部分痛点的框架,下文再讲述 Pulsar。