初探kafka

    平常中工做中我并无对kafka接触不少,但了解到不少的框架都和kafka有着紧密的关系。好比rockmetmq是参考了kafka的设计,neflix的缓存组件ehcache是用kafka作数据的同步。同时kafka在大数据方面一般和spark,hadoop,storm一块儿使用,因此我对kafka也产生了一些兴趣,抽了些时间去研究了一下这个框架。由于尚未深刻的研究和使用,因此只能算是初探~。html

    kafka架构

         

 

 

 左边是kafka,右边是rocketmq。kafka的架构如上所示,与rocketmq很类似。不一样的是rocketmq用的是namesrv,而kafka用的是zookeeper。zookeeper在kafka的做用是起到一个动态注册发现与负载均衡的做用。linux

  zookeeper与kafka

    一、broker注册

    kafka使用了全局惟一的数字来指代每一个Broker服务器,不一样的Broker必须使用不一样的Broker ID进行注册,建立完节点后,每一个Broker就会将本身的IP地址和端口信息记录到该节点中去。其中,Broker建立的节点类型是临时节点,一旦Broker宕机,则对应的临时节点也会被自动删除。好比我本地起了三个Broker,查看zookeepr /brokers/ids的目录就看到如下内容git

ls /brokers/ids

[0, 1, 2]

get /brokers/ids/1
{"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://10.66.51.58:9093"],"jmx_port":-1,"host":"10.66.51.58","timestamp":"1532936089303","port":9093,"version":4}
cZxid = 0x14b
ctime = Mon Jul 30 15:34:49 CST 2018
mZxid = 0x14b
mtime = Mon Jul 30 15:34:49 CST 2018
pZxid = 0x14b
cversion = 0
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x164ea1258570003
dataLength = 192
numChildren = 0

  二、topic注册

  在kafka中,同一个Topic的消息会被分红多个分区并将其分布在多个Broker上,这些分区信息及与Broker的对应关系也都是由Zookeeper在维护,由专门的节点/broker/topics来记录。Broker服务器启动后,会到对应Topic节点(/brokers/topics)上注册本身的Broker ID并写入针对该Topic的分区信息。好比Topic "test3"建立时指定了三个分区。查看zookeeper内容则知道"2"分区放在Broker-0,"1"分区放在Broker-2,"0"分区放在Broker-1github

ls /brokers/topics
[connect-test, test3, test, my-replicated-topic, __consumer_offsets]

get /brokers/topics/test3
{"version":1,"partitions":{"2":[0],"1":[2],"0":[1]}}
cZxid = 0x154
ctime = Mon Jul 30 16:11:30 CST 2018
mZxid = 0x154
mtime = Mon Jul 30 16:11:30 CST 2018
pZxid = 0x158
cversion = 1
dataVersion = 0
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 52
numChildren = 1

 

   三、生产者负载均衡

   因为同一个Topic消息会被分区并将其分布在多个Broker上,所以,生产者须要将消息合理地发送到这些分布式的Broker上,那么如何实现生产者的负载均衡,Kafka支持传统的四层负载均衡,也支持Zookeeper方式实现负载均衡。apache

   (1) 四层负载均衡,根据生产者的IP地址和端口来为其肯定一个相关联的Broker。一般,一个生产者只会对应单个Broker,而后该生产者产生的消息都发往该Broker。这种方式逻辑简单,每一个生产者不须要同其余系统创建额外的TCP链接,只须要和Broker维护单个TCP链接便可。可是,其没法作到真正的负载均衡,由于实际系统中的每一个生产者产生的消息量及每一个Broker的消息存储量都是不同的,若是有些生产者产生的消息远多于其余生产者的话,那么会致使不一样的Broker接收到的消息总数差别巨大,同时,生产者也没法实时感知到Broker的新增和删除。缓存

   (2) 使用Zookeeper进行负载均衡,因为每一个Broker启动时,都会完成Broker注册过程,生产者会经过该节点的变化来动态地感知到Broker服务器列表的变动,这样就能够实现动态的负载均衡机制。服务器

   四、消费者负载均衡

   与生产者相似,Kafka中的消费者一样须要进行负载均衡来实现多个消费者合理地从对应的Broker服务器上接收消息,每一个消费者分组包含若干消费者,每条消息都只会发送给分组中的一个消费者,不一样的消费者分组消费本身特定的Topic下面的消息,互不干扰。网络

   五、消费者offset的存储

   消费者的offset我以前了解到是存在zookeeper上面,但下载最新版的没在上面找到,后面了解到是存到了broker上面~。数据结构


    消息的持久化

     Kafka 对消息的存储和缓存严重依赖于文件系统,但kafka的性能却远超出了人们对磁盘IO的性能预估。主要缘由有:架构

      一、顺序磁盘IO

      关于磁盘性能的关键事实是,磁盘的吞吐量和过去十年里磁盘的寻址延迟不一样。所以,使用6个7200rpm、SATA接口、RAID-5的磁盘阵列在JBOD配置下的顺序写 入的性能约为600MB/秒,但随机写入的性能仅约为100k/秒,相差6000倍以上。由于线性的读取和写入是磁盘使用模式中最有规律的,而且由操做系统进行了大量的优化。现代操做系统提供了 read-ahead 和 write-behind 技术,read-ahead 是以大的 data block 为单位预先读取数据,而 write-behind 是将多个小型的逻辑写  合并成一次大型的物理磁盘写入。关于该问题的进一步讨论能够参考 ACM Queue article,他们发现实际上顺序磁盘访问在某些状况下比随机内存访问还要快

      二、pageCache而不是in-memory cache

      为了提升性能,现代操做系统在愈来愈注重使用内存对磁盘进行 cache。现代操做系统主动将全部空闲内存用做 disk caching,代价是在内存回收时性能会有所下降。全部对磁盘的读写操做都会经过这个统一的 cache。若是不使用直接I/O,该功能不能轻易关闭。所以即便进程维护了 in-process cache,该数据也可能会被复制到操做系统的 pagecache 中,事实上全部内容都被存储了两份。

  此外,Kafka 创建在 JVM 之上,了解 Java 内存使用的人都知道两点:

  1. 对象的内存开销很是高,一般是所存储的数据的两倍(甚至更多)。
  2. 随着堆中数据的增长,Java 的垃圾回收变得愈来愈复杂和缓慢。

        受这些因素影响,相比于维护 in-memory cache 或者其余结构,使用文件系统和 pagecache 显得更有优点--咱们能够经过自动访问全部空闲内存将可用缓存的容量至少翻倍,而且经过存储紧凑的字节结构而不是独立的对象,有望将缓存容量再翻一番。 这样使得32GB的机器缓存容量能够达到28-30GB,而且不会产生额外的 GC 负担。此外,即便服务从新启动,缓存依旧可用,而 in-process cache 则须要在内存中重建(重建一个10GB的缓存可能须要10分钟),不然进程就要从 cold cache 的状态开始(这意味着进程最初的性能表现十分糟糕)。 这同时也极大的简化了代码,由于全部保持 cache 和文件系统之间一致性的逻辑如今都被放到了 OS 中,这样作比一次性的进程内缓存更准确、更高效。若是你的磁盘使用更倾向于顺序读取,那么 read-ahead 能够有效的使用每次从磁盘中读取到的有用数据预先填充 cache。

      因此kafka给出了一个简单的设计:相比于维护尽量多的 in-memory cache,而且在空间不足的时候匆忙将数据 flush 到文件系统,咱们把这个过程倒过来。全部数据一开始就被写入到文件系统的持久化日志中,而不用在 cache 空间不足的时候 flush 到磁盘。实际上,这代表数据被转移到了内核的 pagecache 中。

     三、队列存储数据

      消息系统使用的持久化数据结构,BTree 是最通用的数据结构,能够在消息系统可以支持各类事务性和非事务性语义。 虽然 BTree 的操做复杂度是 O(log N),但成本也至关高。一般咱们认为 O(log N) 基本等同于常数时间,但这条在磁盘操做中不成立。磁盘寻址是每10ms一跳,而且每一个磁盘同时只能执行一次寻址,所以并行性受到了限制。 所以即便是少许的磁盘寻址也会很高的开销。因为存储系统将很是快的cache操做和很是慢的物理磁盘操做混合在一块儿,当数据随着 fixed cache 增长时,能够看到树的性能一般是非线性的——好比数据翻倍时性能降低不仅两倍。

      因此直观来看,持久化队列能够创建在简单的读取和向文件后追加两种操做之上,这和日志解决方案相同。这种架构的优势在于全部的操做复杂度都是O(1),并且读操做不会阻塞写操做,读操做之间也不会互相影响。这有着明显的性能优点,因为性能和数据大小彻底分离开来——服务器如今能够充分利用大量廉价、低转速的1+TB SATA硬盘。 虽然这些硬盘的寻址性能不好,但他们在大规模读写方面的性能是能够接受的,并且价格是原来的三分之1、容量是原来的三倍。

      在不产生任何性能损失的状况下可以访问几乎无限的硬盘空间,这意味着咱们能够提供一些其它消息系统不常见的特性。例如:在 Kafka 中,咱们可让消息保留相对较长的一段时间(好比一周),而不是试图在被消费后当即删除。正如咱们后面将要提到的,这给消费者带来了很大的灵活性。

   

   消息的传输

      解决了数据持久化的问题,还须要解决数据的发送和消费等相关传输问题。

      一、批量操做而不是屡次小IO

        一旦消除了磁盘访问模式不佳的状况,系统性能低下的主要缘由就剩下了两个:大量的小型 I/O 操做,以及过多的字节拷贝。小型的 I/O 操做发生在客户端和服务端之间以及服务端自身的持久化操做中。为了不这种状况,kafka的通信协议是创建在一个 “消息块” 的抽象基础上,合理将消息分组。 这使得网络请求将多个消息打包成一组,而不是每次发送一条消息,从而使整组消息分担网络中往返的开销。Consumer 每次获取多个大型有序的消息块,并由服务端 依次将消息块一次加载到它的日志中。

         这个简单的优化对速度有着数量级的提高。批处理容许更大的网络数据包,更大的顺序读写磁盘操做,连续的内存块等等,全部这些都使 KafKa 将随机流消息顺序写入到磁盘, 再由 consumers 进行消费。

      二、sendfile避免过多的字节拷贝

          broker 维护的消息日志自己就是一个文件目录,每一个文件都由一系列以相同格式写入到磁盘的消息集合组成,这种写入格式被 producer 和 consumer 共用。保持这种通用格式能够对一些很重要的操做进行优化: 持久化日志块的网络传输。 现代的unix 操做系统提供了一个高度优化的编码方式,用于将数据从 pagecache 转移到 socket 网络链接中;在 Linux 中系统调用 sendfile 作到这一点。

       为了理解 sendfile 的意义,了解数据从文件到套接字的常见数据传输路径就很是重要:

  1. 操做系统从磁盘读取数据到内核空间的 pagecache
  2. 应用程序读取内核空间的数据到用户空间的缓冲区
  3. 应用程序将数据(用户空间的缓冲区)写回内核空间到套接字缓冲区(内核空间)
  4. 操做系统将数据从套接字缓冲区(内核空间)复制到经过网络发送的 NIC 缓冲区

      这显然是低效的,有四次 copy 操做和两次系统调用。使用 sendfile 方法,能够容许操做系统将数据从 pagecache 直接发送到网络,这样避免从新复制数据。因此这种优化方式,只须要最后一步的copy操做,将数据复制到 NIC 缓冲区。

咱们指望一个广泛的应用场景,一个 topic 被多消费者消费。使用上面提交的 zero-copy(零拷贝)优化,数据在使用时只会被复制到 pagecache 中一次,节省了每次拷贝到用户空间内存中,再从用户空间进行读取的消耗。这使得消息可以以接近网络链接速度的 上限进行消费。

       pagecache 和 sendfile 的组合使用意味着,在一个kafka集群中,大多数 consumer 消费时,您将看不到磁盘上的读取活动,由于数据将彻底由缓存提供。

      三、压缩数据

      在某些状况下,数据传输的瓶颈不是 CPU ,也不是磁盘,而是网络带宽。对于须要经过广域网在数据中心之间发送消息的数据管道尤为如此。固然,用户能够在不须要 Kakfa 支持下一次一个的压缩消息。可是这样会形成很是差的压缩比和消息重复类型的冗余,好比 JSON 中的字段名称或者是或 Web 日志中的用户代理或公共字符串值。高性能的压缩是一次压缩多个消息,而不是压缩单个消息。

      Kafka 以高效的批处理格式支持一批消息能够压缩在一块儿发送到服务器。这批消息将以压缩格式写入,而且在日志中保持压缩,只会在 consumer 消费时解压缩。

      Kafka 支持 GZIP,Snappy 和 LZ4 压缩协议       

     

     消息是推仍是拉?

        Kafka 在这方面采起了一种较为传统的设计方式,也是大多数的消息系统所共享的方式:即 producer 把数据 push 到 broker,而后 consumer 从 broker 中 pull 数据。 但也有一些 系统,好比 Scribe 和 Apache Flume,沿着一条彻底不一样的 push-based 的路径,将数据 push 到下游节点。这两种方法都有优缺点。然而,因为 broker 控制着数据传输速率, 因此 push-based 系统很难处理不一样的 consumer。让 broker 控制数据传输速率主要是为了让 consumer 可以以可能的最大速率消费;不幸的是,这致使着在 push-based 的系统中,当消费速率低于生产速率时,consumer 每每会不堪重负(本质上相似于拒绝服务攻击)。pull-based 系统有一个很好的特性, 那就是当 consumer 速率落后于 producer 时,能够在适当的时间遇上来。还能够经过使用某种 backoff 协议来减小这种现象:即 consumer 能够经过 backoff 表示它已经不堪重负了,然而经过得到负载状况来充分使用 consumer(但永远不超载)这一方式实现起来比它看起来更棘手。前面以这种方式构建系统的尝试,引导着 Kafka 走向了更传统的 pull 模型。

       另外一个 pull-based 系统的优势在于:它能够大批量生产要发送给 consumer 的数据。而 push-based 系统必须选择当即发送请求或者积累更多的数据,而后在不知道下游的 consumer 可否当即处理它的状况下发送这些数据。若是系统调整为低延迟状态,这就会致使一次只发送一条消息,以致于传输的数据再也不被缓冲,这种方式是极度浪费的。 而 pull-based 的设计修复了该问题,由于 consumer 老是将全部可用的(或者达到配置的最大长度)消息 pull 到 log 当前位置的后面,从而使得数据可以获得最佳的处理而不会引入没必要要的延迟。

     简单的 pull-based 系统的不足之处在于:若是 broker 中没有数据,consumer 可能会在一个紧密的循环中结束轮询,实际上 busy-waiting 直到数据到来。为了不 busy-waiting,咱们在 pull 请求中加入参数,使得 consumer 在一个“long pull”中阻塞等待,直到数据到来(还能够选择等待给定字节长度的数据来确保传输长度)。

 

    消息的offset

    大多数消息系统都在 broker 上保存被消费消息的元数据。也就是说,当消息被传递给 consumer,broker 要么当即在本地记录该事件,要么等待 consumer 的确认后再记录。这是一种至关直接的选择,并且事实上对于单机服务器来讲,也没与其它地方可以存储这些状态信息。 因为大多数消息系统用于存储的数据结构规模都很小,因此这也是一个很实用的选择——由于只要 broker 知道哪些消息被消费了,就能够在本地当即进行删除,一直保持较小的数据量。

    但要让 broker 和 consumer 就被消费的数据保持一致性也不是一个小问题。若是 broker 在每条消息被发送到网络的时候,当即将其标记为 consumed,那么一旦 consumer 没法处理该消息(可能由 consumer 崩溃或者请求超时或者其余缘由致使),该消息就会丢失。 为了解决消息丢失的问题,许多消息系统增长了确认机制:即当消息被发送出去的时候,消息仅被标记为sent 而不是 consumed;而后 broker 会等待一个来自 consumer 的特定确认,再将消息标记为consumed。这个策略修复了消息丢失的问题,但也产生了新问题。 首先,若是 consumer 处理了消息但在发送确认以前出错了,那么该消息就会被消费两次。第二个是关于性能的,如今 broker 必须为每条消息保存多个状态(首先对其加锁,确保该消息只被发送一次,而后将其永久的标记为 consumed,以便将其移除)。 还有更棘手的问题要处理,好比如何处理已经发送但一直得不到确认的消息。

      Kafka 使用彻底不一样的方式解决消息丢失问题。Kafka的 topic 被分割成了一组彻底有序的 partition,其中每个 partition 在任意给定的时间内只能被每一个订阅了这个 topic 的 consumer 组中的一个 consumer 消费。这意味着 partition 中 每个 consumer 的位置仅仅是一个数字,即下一条要消费的消息的offset。这使得被消费的消息的状态信息至关少,每一个 partition 只须要一个数字。这个状态信息还能够做为周期性的 checkpoint。这以很是低的代价实现了和消息确认机制等同的效果。

    这种方式还有一个附加的好处。consumer 能够回退到以前的 offset 来再次消费以前的数据,这个操做违反了队列的基本原则,但事实证实对大多数 consumer 来讲这是一个必不可少的特性。 例如,若是 consumer 的代码有 bug,而且在 bug 被发现前已经有一部分数据被消费了, 那么 consumer 能够在 bug 修复后经过回退到以前的 offset 来再次消费这些数据。

   

    以上就是我整理的一些关于kafka的资料,主要仍是集中在概念设计这一块,不知道你们看了有没有有所收获呢。

相关文章
相关标签/搜索