消息队列之Kafka——从架构技术从新理解Kafka

 

Apache Kafka® 是 一个分布式流处理平台. 这到底意味着什么呢?

咱们知道流处理平台有如下三种特性:html

  1. 可让你发布和订阅流式的记录。这一方面与消息队列或者企业消息系统相似。
  2. 能够储存流式的记录,而且有较好的容错性。
  3. 能够在流式记录产生时就进行处理。

Kafka适合什么样的场景?linux

它能够用于两大类别的应用:算法

  1. 构造实时流数据管道,它能够在系统或应用之间可靠地获取数据。 (至关于message queue)
  2. 构建实时流式应用程序,对这些流数据进行转换或者影响。 (就是流处理,经过kafka stream topic和topic之间内部进行变化

Kafka有四个核心的API:数据库

  • The Producer API 容许一个应用程序发布一串流式的数据到一个或者多个Kafka topic。
  • The Consumer API 容许一个应用程序订阅一个或多个 topic ,而且对发布给他们的流式数据进行处理。
  • The Streams API 容许一个应用程序做为一个流处理器,消费一个或者多个topic产生的输入流,而后生产一个输出流到一个或多个topic中去,在输入输出流中进行有效的转换。
  • The Connector API 容许构建并运行可重用的生产者或者消费者,将Kafka topics链接到已存在的应用程序或者数据系统。好比,链接到一个关系型数据库,捕捉表(table)的全部变动内容。

在Kafka中,客户端和服务器使用一个简单、高性能、支持多语言的 TCP 协议.此协议版本化而且向下兼容老版本, 咱们为Kafka提供了Java客户端,也支持许多其余语言的客户端。apache

————————————————————————————————————————————————api

以上摘自Apache Kafka官网缓存

 

而本文关注的焦点是:构造实时流数据管道,即message queue部分。也就是咱们常使用的“消息队列”部分,这部分自己也是Kafka最初及最基本的底层设计。服务器

 

让咱们回到最初Kafka尚未设计出来的时候,经过从新设计Kafka,一步步了解为何Kafka是咱们如今看到的样子,到时咱们将了解到Kafka做为消息队列会高吞吐量、分布式、高容错稳定。咱们把这个项目命名为:Kafka-R网络

 

如今咱们开始设计Kafka-R,咱们正式设计Kafka-R以前须要考虑设计目标,也就是个人Kafka-R设计出来究竟是用来干吗的,适用于什么业务场景,解决什么需求痛点。数据结构

能够很快想到:数据交换。这是消息队列的基本功能与要求。

而后呢?能够做为个大平台,支持多语言,最好能知足大公司的业务需求,并且最好是实时的,至少是低延迟。

归纳起来就是:咱们设计Kafka-R的目标是能够做为一个统一的平台来处理大公司可能拥有的全部实时数据馈送。

为了知足咱们的Kafka-R的设计目标,那么Kafka-R须要具有如下这些特征:

具备高吞吐量来支持高容量事件流。

可以正常处理大量的数据积压,以便支持来自离线系统的周期性数据加载。

系统必须处理低延迟分发,来处理更传统的消息传递用例。

数据馈送分区与分布式,以及实时。

系统在出现机器故障时可以保证容错。

 

1、数据的存储方式——in-memory&in-disk

有两种选择:第一种,使用in-memory cache,并在空间不足的的时候将数据flush到文件系统中。

另一种,使用in-disk,一开始把全部的数据写入文件系统的持久化日志中。

咱们的Kafka-R采用in-disk。实际上在此状况数据被转移到了内核的pagecache中。

“磁盘速度慢”是人们的广泛印象,那么Kafka-R的数据存储和缓存基于文件系统,这样的性能可以接受吗?

而事实是,磁盘的速度比人们预期的要慢得多,也快得多,取决于人们使用磁盘的方式。

咱们知道磁盘有顺序读和随机读两种模式,之间的性能差别很大,但具体差距多少呢?

使用6个7200rpm、SATA接口、RAID-5的磁盘阵列在JBOD配置下的顺序写入的性能约为600MB/秒,但随机写入的性能仅约为100k/秒,相差6000倍。 

线性的读取和写入是磁盘使用模式中最有规律的,而且操做系统进行了大量的优化。现代操做系统提供了read-ahead和write-behind技术,read-ahead是以大的data block为单位预先读取数据,而write-hehind将多个小型的逻辑写合并成一次大型的物理磁盘写入。

 

磁盘除了访问模式,还有两个低效率操做影响系统的性能:大量的小型I/O操做,过多的字节拷贝。

那么咱们怎么处理这些问题呢?

针对于大量的小型I/O操做,Kafka-R使用“消息块”将消息合理分组。使网络请求将多个消息打包成一组,而不是每次发送一条消息,从而使整组消息分担网络往返的开销。

另外一个过多的字节拷贝,Kafka-R使用producer,broker和consumer都共享的标准化通用的二进制消息格式,这样数据块不用修改就能在他们之间传递。

保持这种通用的格式有什么用呢?

能够对持久化日志块的网络传输进行优化。现代的unix操做系统提供了一个高度优化的编码方式,用于将数据从pagecache转移到socket网络链接中。

数据从文件到套接字的常见数据传输过程:磁盘->pagecache->用户空间缓存区->套接字缓冲区(内核空间)->NIC缓存区

1. 操做系统从磁盘读区数据到内核空间的pagecache

2. 应用程序读取内核空间的数据到用户空间的缓存区

3. 应用程序将数据(用户空间的缓存区)写会内核空间到套接字缓冲区(内核空间)

4. 操做系统将数据从套接字缓冲区(内核空间)复制到可以经过网络发送的NIC缓冲区

共进行了4次copy操做和2次系统调用,显然很低效。在Linux系统中使用zero-copy(零拷贝)优化,其中之一sendfile,使用后的数据传输过程是这样:磁盘->pagecache->NIC缓存区。

咱们的Kafka-R经过使用zero-copy优化技术,能够用尽量低的消费代价让多个consumer消费。数据在使用时只会被复制到pagecache中一次,这样消息可以以接近网络链接的速度上限进行消费。

 

 

2、数据结构——BTree&日志解决方案

日志解决方案即简单读取与追加来操做文件。

咱们的Kafka-R采用日志解决方案。

咱们知道BTree是通用的数据结构,其普遍用于随机的数据访问。BTree的操做时间复杂度是O(log N),基本等同于常数时间,但在磁盘上则不成立。

每一个磁盘同时只能执行一次寻址,并行性受到限制。少许的磁盘寻址也有很高的开销。数据翻倍时性能降低不止两倍。 

而日志解决方案的数据存储架构,全部的操做时间复杂度都是O(1),而且读不会阻塞写,读之间也不会相互影响。

因为性能和数据的大小是彻底分离的,则服务器可使用大量廉价、低转速的1+TB SATA硬盘,即便这些硬盘的寻址性能不好,在大规模读写的性能也能够接受,并且三分之一的价格三倍的容量。

 

 

3、获取数据方式——push-based&pull-based

由consumer从broker那里pull数据呢?仍是从broker将数据push到consumer?

咱们的Kafka-R采用pull-based方式。

这是大多数消息系统所共享的传统的方式:即producer把数据push到broker,而后consumer从broker中pull数据。

 

push-based系统优势:

1. 让consumer可以以最大速率消费。

push-based系统缺点:

1. 因为broker控制着数据传输速率,因此很难处理不一样的consumer。

2. 当消费速率低于生产速率时,consumer每每会不堪重负(本质相似于拒绝服务攻击)。

3. 必须选择当即发送请求或者积累更多的数据,而后在不知道下游的consumer可否当即处理它的状况下发送这些数据。特别系统为低延迟状态下,这样会极度糟糕浪费。

 

pull-based系统优势:

1. 能够大批量生产要发送给consumer的数据。

pull-based系统缺点:

1. 若是broker中没有数据,consumer可能会在一个紧密的循环中结束轮询,实际上会busy-waiting直到数据到来。

 

为了不busy-waiting,咱们的Kafka-R的pull参数重加入参数,使得consumer在一个“long pull”中阻塞等待,知道数据到来(还能够选择等待给定字节长度的数据来确保传输长度)。

 

 

4、消费者的位置——consumed&offset

Kafka-R的消费过程:consumer经过向broker发出一个“fetch”请求来获取它想要消费的partition。consumer的每一个请求在log中指定了对应的offset,并接收从该位置开始的一大块数据。

consumed指经过状态标示已经被消费的数据。

大多数消息系统都在broker上保存被消费消息的元数据。当消息被传递给consumer,broker要么当即在本地记录该事件,要么等待consumer的确认后再记录。

消费者的位置问题其实就是broker和consumer之间被消费数据的一致性问题。若是broker再每条消息被发送到网络的时候,当即将其标记为consumd,那么一旦consumer没法处理该消息(可能由consumer崩溃或者请求超时或者其余缘由致使),该消息就会丢失。为了解决消息丢失的问题,许多消息系统增长了确认机制:即当消息被发送出去的时候,消息被标记为sent而不是consumed;而后broker会等待一个来自consumer的特定确认,再将消息标记为consumed。这个策略修复了消息丢失的问题,但也产生了新问题。首先,若是consumer处理了消息但在发送确认以前出错了,那么该消息就会被消费两次。第二个是有关性能的,broker必须为每条消息保存多个状态(首先对其加锁,确保该消息只被发送一次,而后将其永久的标记为consumed,以便将其移除)。还有更棘手的问题,好比如何处理已经发送但一直等不到确认的消息。

Kafka-R使用offse来处理消息丢失问题。topic被分割成一组彻底有序的partition,其中每个partition在任意给定的时间内只能被每一个订阅了这个topic的consumer组中的一个consumer消费。意味着partition中每个consumer的位置仅仅是一个数字,即下一条要消费的消息的offset。这样就能够按很是低的代价实现和消息确认机制等同的效果。consumer还能够回退到以前的offset再次消费以前的数据,这样的操做违背了队列的基本原则,但事实证实对consumer来讲是个很重要的特性。若是consumer代码由bug,而且在bug被发现以前有部分数据被消费了,consumer能够在bug修复后经过回退到以前的offset再次消费这些数据。

 

 

 5、leader选举——多数投票机制f+1&ISR

Kafka-R动态维护了一个同步状态的备份的集合(a set of in-sync replicas),简称ISR。

在了解ISR以前咱们须要先了解in-sync。

Kafka-R判断节点是否存活有两种方式:

1. 节点必须能够维护和ZooKeeper的链接,ZooKeeper经过心跳机制检查每一个节点的链接。

2. 若是节点是个follower,它必须能及时的同步leader的写操做,而且延时不能过久。

只有知足上面两个条件的节点就处于“in sync”状态。leader会追踪全部“in sync”的节点,若是有节点挂掉了,或是写超时,或是心跳超时,leader就会把它从同步副本列表中移除。

在ISR集合中节点会和leader保持高度一致,只有这个集合的成员才有资格被选举为leader,一条消息必须被这个集合全部节点读取并追加到日志中了,这条消息才能视为提交。

ISR集合发生变化会在ZooKeeper持久化,因此这个集合中的任何一个节点都有资格被选为leader。

 

多数投票机制f+1顾名思义:假设咱们有2f+1个副本,若是在leader宣布消息提交以前必须有f+1个副本收到该消息,而且若是咱们从这只少f+1个副本之中,有着最完整的日志记录的follower里来选择一个新的leader,那么在故障数小于f的状况下,选举出的leader保证具备全部提交的消息。

多数投票算法必须处理许多细节,好比精肯定义怎样使日志更加完整,确保在leader down期间,保证日志一致性或者副本服务器的副本集改变。

多数投票机制有一个很是好的优势:延迟取决于较快的服务器。也就是说,若是副本数是3,则备份完成的等待时间取决于最快的follwer。

所以提交时能避免最慢的服务器,这也是多数投票机制的优势。

一样多数投票的缺点也很明显,多数的节点挂掉后不能选择出leader。而经过冗余来避免故障率,会下降吞吐量,不利于处理海量数据。

是一种Quorum读写机制(若是选择写入时候须要保证必定数量的副本写入成功,读取时须要保证读取必定数量的副本,读取和写入之间有重叠)。

 

Kafka-R保证只要有只少一个同步中的节点存活,提交的消息就不会丢失。

在一次故障生存以后,大多数的quorum须要三个备份节点和一次确认,ISR只须要两个备份节点和一次确认。

建立副本的单位是topic的partition,正常状况下,每一个分区都有一个leader和零或多个follower。总的副本数是包括leader与全部follwer的总和。全部的读写操做都由leader处理,通常partition的数量都比broker的数量多的多,各分区的leader均匀分布在broker中。全部的follower节点都同步leader节点的日志,日志中的消息和偏移量都和leader保持一致。

 

 

6、Uclean leader选举——ISR副本&第一个副本

若是节点全挂了的服务恢复。

Kafka-R对于数据不会丢失时基于只少一个节点保持同步状态,而一旦分区上的全部备份节点都挂了,就没法保证了。

Kafka-R默认“第一个副本”策略。

 

ISR副本:等待一个ISR的副本从新恢复正常服务,并选择这个副本做为新leader(极大可能拥有所有数据)

第一个副本:选择第一个从新恢复正常服务的副本(不必定是ISR)做为leader。

 

这是可用性和一致性之间的简单妥协,若是只等待ISR的备份节点,只要ISR备份节点都挂了,那么服务都一直会不可用,若是他们的数据损坏了或者丢失了,那就会是长久的宕机。另外一方面,若是不是ISR中的节点恢复服务而且咱们容许它成为leader,那么它的数据就是可信的来源,即便它不能保证记录了每个已经提交的消息。

能够配置属性unclean.leader.election.enable禁用次策略,那么就会使用“ISR副本”策略即停机时间优于不一样步,以修改默认配置。

 

经过以上的架构技术的分析和选型,咱们就大体设计出了咱们的消息队列Kafka-R

相关文章
相关标签/搜索