kafka的设计

1.动机

设计 kafka 初衷,做为统一平台处理大公司的实时数据。因此 必须具备以下特性:java

  • 支持海量数据
  • 高吞吐量
  • 低延迟(实时性)
  • 支持分区,分布式
  • 容错

 2.持久化

kafka 高度依赖 文件系统 存储和缓存消息。经过对磁盘的顺序读写,并借助 OS 层面的 页缓存(page cache),保证优于缓存在内存中或其余结构中。linux

为什么使用磁盘效率仍然很高:缓存

  • 利用磁盘的顺序读写,操做一个文件,将数据追加到文件的末尾。相比于随机读写,效率很高。
  • 利用 OS 层面的页缓存(page cache),顺序读文件能够预读数据到 page cache。经过自动访问全部可用内存 以及 存储紧凑型字节结构而非单个对象提升内存使用率。OS缓存相对于进程内的缓存,重启后仍然可用,不须要重建。
  • 全部的操做时间复杂度都是 常量时间O(1),与数据大小无关,读 和 写 不会互相阻塞。

3.效率

使用磁盘效率低下主要有两个缘由:服务器

  • 过多的小 I/O 操做:发生在客户端和服务端之间,以及 服务端本身的持久化操做中
  • 过多的字节复制

针对 小 I/O 操做,kafka 根据 "message set" 抽象构建了一个协议,该 抽象 天然地将消息分组在一块儿。该协议容许网络请求将消息分组在一块儿,并分摊网络往返的开销,而不是一次发送一条消息。服务器依次将消息块一次附加到其日志中,而消费者一次获取大型线性块。网络

针对过多的字节复制,使用了由生产者、代理 和 消费者共享的标准化二进制消息格式(这样,数据块就能够在它们之间不进行修改的状况下进行传输)。服务器所持有的消息日志 自己是一个文件目录,每一个文件都由一系列 "message set" 填充。这些消息集以生产者和消费者使用的相同格式写入磁盘。维护这种通用格式能够优化  持久化日志块的 网络传输。app

存储在文件中的信息经过网络发送给客户,经历的几个路径:负载均衡

  • 操做系统在内核空间将数据从磁盘读取到 page cache 中。
  • 应用程序从内核空间读取到 用户空间缓冲区
  • 应用程序将数据写回到内核空间的套接字缓冲区
  • 操做系统将数据从套接字缓冲区复制到 NIC 缓冲区(NIC:网络接口控制器)。

以上产生了四个副本拷贝,2个系统调用开销,效率低下。异步

                 

          

       

 基于 零拷贝技术:消息数据直接从 page cache 发送到网络。linux 中使用 sendfile 完成零拷贝技术。java 中 java.nio.channels.FileChannel 的 transferTo() 方法也使用了零拷贝技术。socket

                  

kafka 经过 page cache 和 sendfile 的组合,将看不到磁盘上的任何读取活动,由于它们将彻底从缓存中提供数据。分布式

端到端的批量压缩

Kafka经过递归消息集来支持同时压缩多个消息而减小相同消息的冗余。 一批消息能够一块儿压缩并以此形式发送到服务器。 这批消息将以压缩形式写入,并将在日志中保持压缩,而且只能由消费者解压缩。Kafka支持GZIP和Snappy压缩协议。

4.生产者

4.1负载均衡

生产者将数据直接发送给分区对应的 leader。为了实现这一点,全部的 kafka 节点要可以在 任什么时候候应答 哪一个服务器还活着以及 topic分区的leader在哪里的 元数据请求。

客户端本身控制 消息发送到哪一个分区,这能够随机完成,实现一种随机的负载平衡,也能够经过一些语义分区函数完成。

4.2异步发送

启用 kafka 生产者 的批处理,kafka 将在内存中累积数据而后一次性批量发送。能够配置 累计不超过固定数量的消息(bach.size),等待不超过固定延迟时间(linger.ms)。

5.消费者

5.1拉 VS 推送

消费者主动拉取消息缺点:若是 broker 没有数据,消费者会轮询,忙等待直到数据到达。kafka 能够在拉请求中设置一些参数,容许使用者请求在“长轮询”中阻塞,等待数据到达(也能够选择等待,直到给定的字节数可用,以确保传输大小很大)

消费者被动推送消息缺点:很难适应消费速率不一样的消费者,消息发送速率是由 broker 决定的,broker 是尽量快的将消息发送出去,这样会形成消费者来不及处理消息,典型的表现就是 网络阻塞 和 拒绝服务。

5.2消费者的定位

topic 被分为一组有序的分区,每一个分区在任何给定的时间都由每一个订阅消费者组中的一个消费者消费。这意味着消费者在每一个分区中的位置只是一个整数,这个整数表明了即将要消费的消息的偏移量。这样作的好处是能够返回到旧的偏移量进行消费。

5.3离线数据加载

可伸缩持久性容许消费者只按期使用,例如批量数据加载,按期将数据批量加载到离线系统(如Hadoop或关系数据仓库)中。

6.消息传递语义

很明显,消息传递保证可以提供多种可能:

  • 最多一次:消息可能丢失,可是毫不会重发
  • 至少一次:消息毫不会丢失,可是可能会重发
  • 正好一次:每条消息被传递一次

kafka 的消息传递语义:

一旦发布的消息已提交到日志,只要副本分区写入了此消息的一个broker仍然"活着”,它就不会丢失。

0.11.0.0 版本以前,若是一个生产者没有收到消息提交的响应,那么生产者只能从新发送该消息。这就保证了至少一次的传递语义。若是上一次的请求其实是成功的,那么消息就会再次写到日志中,形成重复消费。

0.11.0.0 版本以后,kafka 生产者支持幂等传递,保证从新发送不会致使日志中有重复记录。为了实现这一点,broker 为 每个生产者 分配一个 ID,使用生产者随每条消息一块儿发送的序列号来消除重复的消息。

同时也是从 0.11.0.0 版本以后,生产者支持使用事务类语义将消息发送到多个 topic 分区的能力:即,要么全部消息都已成功写入,要么都未成功写入。这方面的主要用例是在Kafka topic 之间进行一次处理。

固然,不是全部的使用场景都须要如此严谨的保障,对于延迟敏感的,咱们容许生产者指定它想要的耐用性水平。如生产者能够指定它获取需等待10毫秒量级上的响应。生产者也能够指定异步发送,或只等待leader(不须要副本的响应)有响应。

从消费者的角度描述语义:

  • 读取到消息,在日志中保存位置,最后处理消息。这种顺序 若是消费者在保存位置以后,处理消息以前崩溃,数据会丢失,属于 最多一次的语义。
  • 读取消息,处理消息,在日志中保存位置。这种顺序,若是消费者在处理消息以后,日志中保存位置以前崩溃,数据会被屡次处理,属于至少一次的语义。在多数状况下,消息都有一个主键,因此更新是幂等的(一次执行和屡次执行的影响相同)。

kafka 默认是保证“至少一次”传递,并容许用户经过禁止生产者重试和处理一批消息前提交它的偏移量来实现 “最多一次”传递。而“正好一次”传递须要与目标存储系统合做,但kafka提供了偏移量,因此实现这个很简单。

7.副本

kafka 在各个服务器上备份 每一个 topic 的 partition (经过 replication factor 设置副本数)。当集群中的某个服务器发生故障时,自动转移到这些副本,以便在故障时,消息仍然可用。

kafka 的默认 副本因子为 1,即不建立副本。副本因子是指副本的总数,包括 leader 。

副本以 topic 的 partition 为单位。在非故障的状况下,kafka 中的每一个 partition 都有一个 leader,0 个或者多个 follower。全部的读 和写都指向 分区的 leader。一般,分区数 多于 broker 的数量,leader 均匀的分布在 broker 上。follower 的日志与 leader 的日志相同,即相同的 偏移量 offset 和 消息顺序 。(固然,有可能在某个时间点,leader 上比 follower 多几条还未同步的消息)。

kafka 节点存活的2个条件:

  • 一个节点必须能维持与 zookeeper 的会话(经过 zookeeper 的心跳机制)。
  • 若是该节点是  slave,它必须复制 leader 的写数据,而且不能落后太多。

若是节点 死掉,卡主,或者落后太多,leader 将 从 同步副本 ISR (In Sync Replicas)中移除该节点。落后多少是由  replica.lag.max.messages 控制,卡主多久算卡主是由  replica.lag.time.max.ms 控制。

kafka 动态维护一组同步 leader 数据的副本(ISR),只有这个组中的成员才有资格当选 leader。在全部同步副本都收到写操做以前,不会认为已提交对Kafka分区的写操做。这组 ISR 保存在 zookeeper 中,正由于如此,在ISR中的任何副本都有资格当选leader。对于 f+1 个 副本的 kafka, topic 能够容忍f失败而不会丢失已提交的消息。

若是全部的节点都死掉,有两种能够实现的方式:

  • 等待 ISR 列表中的节点活过来,而且选择该节点做为 leader.
  • 选择第一个活过来的节点(无论它在 ISR 列表中)做为 leader.

从 0.11.0.0 开始 kafka 默认选择第一种策略,等待一致性的副本;能够经过配置 unclean.leader.election.enable 为 true 来选用第二种策略。这两种策略是  可用性 和一致性的权衡,须要根据实际业务来决定。

可用性 和 耐久性保证

当写消息到 kafka 时,生产者能够 配置 须要 leader 收到的确认数 来肯定是否完成请求,经过 配置 acks 知足多种状况:

  • acks = 0 :生产者不会等待服务器的任何确认,消息记录将被马上添加到  socket 缓冲区并视为已发送。这种状况没法确保服务器已经接收到消息记录,重试的配置也不会生效。每一个记录返回的偏移量始终被设置为 1.
  • acks = 1 :服务器端的 leader 写入消息到本地日志就当即响应生产者,而不等待 follower 应答。这种状况,若是在服务器响应生产者以后,复制到 follower 以前挂掉 就会丢失数据。
  • acks = all(-1):服务器端的 leader 会等待 ISR 中全部副本同步响应来确认消息记录。这保证了只要 ISR 中还有一个副本存活就不会丢失记录,也能够设置为 -1;

提供两种 topic 级别的配置 来确保 持久性 而非 可用性。

  • unclean.leader.election.enable 设为 false,(默认即为 false)即 全部的副本都不可用时,分区才不可用。只有当 ISR 中的节点 活过来 分区才能可用。
  • 指定 一个最小的 ISR 数量值,经过 min.insync.replicas 来配置,只有当 ISR 中的数量 超过最小值,分区才会接受写入操做,以此来防止仅写入单个副本然后副本不可用而致使的消息的丢失。该设置仅在 acks = all 并保证至少有这么多同步副本确认消息时生效。

副本管理

上面关于复制日志的讨论实际上只涉及了一个日志,例如 一个 topic 的partition,然而,kafka 集群管理着成百上千个这样的分区。经过 round-robin 的方式平衡 集群中的分区,避免 大部分的分区分布在少许的及诶单上,一样,平衡 leader,使在分区份额上的每一个节点都是 leader。

kafka 选择 其中一个 broker 做为 controller(到 zookeeper 上注册,先到先得)。该 controller 检测 broker 级别的故障,并负责更改 故障 broker 上受影响的 分区的 leader。这样就能够批量处理 leader 的变动。若是 controller 故障,其余存活的 broker 将会成为新的 controller(一样须要到 zookeeper 上注册)。

相关文章
相关标签/搜索