[Kafka] [All about it]

Overview

  • 设计目标:
    • 以O(1) 常数级时间复杂度的访问性能,提供消息持久化能力。
    • 高吞吐率
    • 支持 kafka server 间的消息分区,及分布式消费,同时保证每一个partition内部的消息顺序传输
    • scale out:支持在线水平扩展。
  • 为什么使用消息系统:
    • 解耦
    • 冗余(持久化)
    • 扩展性
    • 顺序保证
    • 缓冲
    • 异步通讯
  • 经常使用message queue对比
    • RabbitMQ: 重量级
    • redis:基于 k-v 对的NoSQL数据库,但自己支持MQ功能,能够做为一个轻量级的队列服务使用。

发布订阅

  • 拓扑:推拉结合。

Zookeeper

 

 

常数级访问性能

  •  ref2

 kafka文件存储机制

  • 一个partition为一个文件夹
  • segment
    • segment的意义:把topic中一个大文件分红多个小文件段,就容易按期清除已消费完的文件。
    • partition内部segment为一个个文件,segment命令方式为在文件后加上上一个segment的最后offset值。
    • 物理结构上,一个segment file由两大部分组成,分别为index file和data file,这两个文件一一对应。
      • index索引文件: 存储 "offset --> 物理偏移"。index文件采用的是稀疏存储的方式,每隔必定字节的数据创建一条索引,从而避免索引文件占用过多的空间,从而能够将索引保留在内存中。不过没有索引的数据间隔中还须要一次顺序扫描,不过范围很小。
      • log数据文件
  • 而每条消息长这样
  • 如何在partition中经过offset查找message
    1. 查找segment file:根据offset二分查找,可快速定位到具体文件
    2. 经过segment file查找message

 

高吞吐率

  • 每一条消息都是被append到partiton中,属于顺序写磁盘,所以效率很是高。
  • 基于partition,producer会根据partition机制选择将其存储到哪个partition,不一样的消息能够并行写入不一样partition。

 

Delivery Guarantee

  • consumer端:consumer在从broker读取消息后,能够选择commit,该操做会在zk中保存该partition中读取消息的offset。能够设置为auto commit。
    • 能够看到,这一过程当中,数据处理与commit的顺序会决定消息从broker到consumer的delivery guarantee semantic。
      1. 读完消息先commit再处理:若是commit以后还未处理时consumer crash,就属于At most once。
      2. 处理完消息再commit: 若是处理完以后crash,就属于At least once。
  • kafka's guarantees are stronger in 3 ways:
    • Idempotent producer
    • Transactions
    • Exactly-once stream processing

Idempotent producer

  • 因为producer重发数据形成的duplicates:在kafka 0.11.0 后基于producer 幂等性解决。解决方案是:Producer ID + <topic, partition> 做为一个相似主键的东西解决。
  • 实现方式:相似于TCP,发送到kafka的每批消息将包含一个序列号,该序列号用于重复数据的删除。与TCP不一样的是,TCP只能在transient in-memory中提供保证。而序列号将被持久化存储到topic中,所以即便leader replica失败,接管的任何其余broker也能感知到消息是否重复。并且这种机制开销至关低,只需在每批消息中添加几个额外的字段。
  • 优势:
    • works transparently -- only one config change
    • sequence numbers and producer ids are in the log
    • resilient to broker failuers, producer retries, etc.
    • 开销低,只需在每批消息中添加几个额外的字段。

Transactions

  • Atomic writes across partitions
  • 在一个事务内的消息要么所有可见,要么全都不可见。
  • consumers必须被配置成,可跳过未提交的消息。
  • transaction api
    producer.initTransactions();
    try {
      producer.beginTransaction();
      producer.send(record1);
      producer.send(record2);
      producer.commitTransaction();
    } catch(ProducerFencedException e) {
      producer.close();
    } catch(KafkaException e) {
      producer.abortTransaction();
    }
  • producers须要使用新的producer API for transactions。
  • consumers须要可以过滤掉 uncommited or aborted transactional messages。

Exactly-once stream processing

  • Exactly-once stream processing across read-process-write tasks.
  • 基于producer幂等性 和 事务原子性,经过Streams API实现 exactly-once 流处理成为可能。

+ Spark Streaming

  • kafka + spark streaming 的应用场景很是常见。
  • 整个系统对exactly once的保证,历来都不是靠系统中某一部分来实现就能搞定的,须要整个流式系统共同努力。
  • spark streaming部分的exactly once的实现:使用WAL实现(注意不是checkpoint和replication,这二者是failover机制,不是专门解决exactly once的)。
  • 输出操做对exactly once的实现:须要输出结果保证幂等性 or 提供事务支持。参见官方文档:
    • In order to achieve exactly-once semantics for output of your results, your output operation that saves the data to an external data store must be either idempotent, or an atomic transaction that saves results and offsets(See Semantics of output operations in the main programming guide for further information).
    • Exactly-once with Idempotent writes:
      • 若是屡次写产生的数据同样,那么这个输出操做就是幂等的。好比saveAsTextFile就是一个典型的幂等的更新。好比messages with unique keys can be written to database without duplication.
      • 实现:
        • set enable.auto.commit = false。缺省状况下,kafka DStream将会在收到数据后立刻commit the consumer offsets。咱们但愿推迟这个操做直到the batch被彻底处理掉。(这样能够实现at least once)
        • 打开spark streaming的checkpointing来存储kafka offsets。可是若是应用程序代码改变了,checkpointed data是不可重用的,所以有second option以下:
        • commit kafka offsets after outputs。kafka提供一个 commitAsync API,以及 HasOffsetRanges 类也能够被用来从initialRDD中提取offsets。
          messages.foreachRDD { rdd =>
            val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
            rdd.foreachPartition { iter =>
              // output to database
            }
            messages.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
          }
    • Exactly-once with transactional writes:
      • transactional updates 须要一个unique identifier,咱们可使用batch time,partition id or kafka offsets来当作identifier,而后把结果 along with identifier一块儿在同一个事务中写入external storage。
      • 这个原子操做能够提供exactly-once语义。若是offsets更新失败,或者经过 offset != $fromOffset 检测到duplicate offset,那么整个事务就会rollback,这也就包含智能了exactly-once语义。
  • SS提供了三种storing offsets的方法,以下三种方法按顺序reliability是递增的,但同时code complexity也是递增的。
    • checkpoints
      • 缺点:
        • output operation必须是幂等的,否则会有repeated outputs。
        • 若是你的程序代码更改过,那么就没法从checkpoint处recover。 [这里的缘由是:checkpointing刷到外部存储的是类Checkpoint对象序列化后的数据。那么在Spark Streaming Application从新编译后,再去反序列化checkpointing的数据就会失败。这个时候就必须新建SparkContext。]
    • kafka itself:kafka有一个offset commit API 用来存储offsets。缺省状况下,the new consumers会周期性地auto-commit offsets。可是很明显,你pull过来的数据不必定来得及被你消费,就会resulting in undefined semantics。
      • 所以,咱们可使用 commitAsync API,来确保处理完数据以后再commit。
      • 缺点:kafka is not transactional,所以你的输出仍然须要幂等。
    • Your own data store
      • 对于支持事务的data store,咱们能够在同一个事务中保存offsets和results,从而保证二者in sync,即便是在failure的状况下。
      • 若是你仔细地检查repeated or skipped offset ranges,那么就能够经过回滚事务来防止重复or丢失数据。这就保证了exactly-once
  • Spark streaming failover的实现,主要三种方式:
    • checkpoint:在driver实现,用于在driver崩溃后,恢复driver的现场。
    • replication:在receiver中用于解决单台executor挂掉后,未保存的数据丢失的问题。
    • WAL:在driver和receiver中实现,用于解决:
      • driver挂掉,全部executor都会挂掉,那么全部未保存的数据都会丢失,replication就无论用了
      • driver挂掉后,哪些block在挂掉前注册到了driver中,以及挂掉前哪些block分配给了当前正在运行的batch job,这些信息就都丢失了。因此须要WAL对这些信息作持久化
  • Conlusion:exactly-once在stream processing中是一个很强的语义,它会不可避免地给你的程序带来一些开销,影响吞吐量。并且不适用于windowed operations。

Kafka HA

  • 多replica
    • 数据一致性
      • 只有leader直接与producer & consumer交互,其余replica做为follower从leader复制数据。(这也减小了保证数据一致性的工做,不然须要保证replica之间有N*N条数据通路进行数据同步)。
      • in-sync replica(ISR): leader所追踪的与其保持同步的replica列表。若是一个follower宕机(经过与zk之间的session来断定)or落后太多(可配置的条数),leader将其从ISR中移除。
      • 复制机制:kafka的复制机制既不是彻底的同步复制,也不是单纯的异步复制。
        • 同步复制:要求全部能工做的follower都复制完,该消息才会commit,极大地影响了吞吐率。
        • 异步复制:只要被leader写入log就认为已被commit,若是leader宕机就会丢失数据。
      • 这也引出了问题:
        • 如何propagate消息
          1. producer首先经过zk找到该partition的leader
          2. producer将消息发送给该partition的leader
          3. leader将消息写入本地log
          4. 每一个follower都从leader pull数据
          5. follower收到消息(并写入log后),向leader发ack。(为了提升性能,每一个follower在接收到数据后便ack,所以对已经commit的消息,只能保证已被存于多个replica的内存中。显然,这是为了性能作了必定牺牲的。)
          6. 一旦leader收到全部replica的ack,该消息就被认为已经commit了,leader将增长HW并向producer发送ack。(这里有一个问题是leader须要收到多少个follower的ack就向producer返回ack,以下)
        • 在向producer发送ACK以前须要保证多少个replica已经收到该消息
        • 怎样处理某个replica不工做的状况
        • 怎样处理failed replica恢复回来的状况
    • leader selection
      这部分的难点就在于,由于follower可能落后许多或者crash了,因此必须确保选择"最新"的follower做为新的leader。基本的原则就是新的leader必须拥有原来的leader commit过的全部消息
      • 一个经常使用的leader selection方式是"Majority Vote"
        • 若是有 2f + 1 个replica,那么在commit以前必须保证有f + 1 个replica复制完消息。为了保证正确的选出新的leader,fail的replica不能超过f个。
        • 劣势:所容忍的fail的follower个数比较少。(e.g: 若是要容忍1个follower挂掉,必需要有5个以上的replica)。
      • 其余经常使用的leader selection算法包括:zk的ZAB,Raft和Viewstamped Replication
      • kafka采用的算法:在zk中动态维护了一个ISR,该ISR里的全部replica都跟上了leader,只有其内的成员才有被选为leader的可能。
    • 如何分配replica:
      • 要求:
        • load balance
        • 容错能力:replica不能都在一个机器上
      • 分配replica算法
        1. 将broker(共n个)和待分配的partition排序
        2. 将第i个partition分配到第 (i mod n) 个broker上
        3. 将第i个partition的第j个replica分配到第 (i + j) mod n 个broker上
  • broker failover
    • controller在zk的 /brokers/ids 节点上注册watch。 (一旦broker宕机,zk对于的znode会自动被删除:ephemeral node,zk会fire controller注册的watch,controller便可获取最新的幸存的Broker列表)
    • controller决定set_p,该集合包含了宕机的全部broker上的全部partition。
    • 对set_p中的每个partition:
      • 从 /brokers/topics/[topic]/partitions/[partition]/state 读取该Partition当前的ISR
      • 决定该partition的新leader。若当前ISR中至少有一个replica还幸存,则选择其中一个做为新的leader,新的ISR则包含当前ISR中全部幸存的replica。不然选择该partition中任意一个幸存的replica做为新leader以及ISR(显然,该场景下可能会有潜在的数据丢失)。
      • 将新的leader,ISR和新的 leader_epoch 及 controller_epoch 写入 /brokers/topics/[topic]/partitions/[partition]/state。
    •  直接经过RPC向set_p相关的Broker发送 LeaderAndISRRequest 命令。controller能够在一个RPC操做中发送多个命令从而提升效率。
  •  因为上述controller的引入,所以也须要 Controller Failover:
    • 每一个broker都会在controller path (/controller) 上注册一个watch。当前controller失败时,watch被fire,全部alive的broker都会去竞选成为新的controller,由zk保证只有一个竞选成功。

 

Consumer Design

High level consumer

  • 适用于client只但愿从kafka读取数据,不太关心消息offset的处理。
  • High level consumer将从某个partition读取的最后一条消息的offset存于zk中(kafka从0.8.2版本开始同时支持将offset存于zk中,与将offset存于专用的kafka topic中)。
  • 这个offset基于client提供的 consumer group 来保存。
  • 注意,consumer group是整个kafka集群共享的,而非某个topic的。
  • 在consumer group内部,某个partition的数据只会被某一个特定的consumer实例所消费。每一个consumer实例能够消费一个或多个特定partition的数据。
    • 劣势:没法保证同一个consumer group里的consumer均匀消费数据。
    • 优点:1. 每一个consumer不用跟大量的broker通讯,减小通讯开销,同时下降了分配难度,实现也更简单。 2. 另外,每一个partition内部数据是有序的,这种设计能够保持有序消费。
  • consumer rebalance
    • 算法:
      • 将目标topic下的全部partition排序,存于Pt
      • 对某个consumer group下全部的consumer排序,存于Cg,第i个consumer记为Ci
      • N = size(Pt) / size(Cg),向上取整
      • 解除Ci对原来分配的partition的消费权
      • 将第 i * N 到 i * (i + 1) * N - 1 个partition分配给Ci
    • 实现:是由每个consumer经过在zk上注册watch完成的。每一个consumer被建立时会触发consumer group的rebalance。

Low Level Consumer

  • 适合于用户但愿更好的控制数据消费的场景,好比:
    • 同一条消息读屡次
    • 只读取某个topic的部分partition
    • 管理事务,从而确保每条消息被处理且仅被处理一次。
  •  low level consumer所须要的额外工做:
    • 必须在应用程序中追踪offset,从而肯定下一条应该消费哪条数据
    • 应用程序须要经过程序获知每一个partition的leader是谁
    • 必须处理leader的变化
  • 通常的使用流程:
    • 查找一个alive broker,而且找出每一个partition的leader
    • 找出每一个partition的follower
    • 定义好请求,该请求应该能描述应用程序须要哪些数据
    • fetch数据
    • 识别leader变化,并对之作出必要的响应

 

高性能

宏观架构层面

  • 利用partition实现并行处理
    • 组织架构:topic只是一个逻辑概念,每一个topic都包含一个或多个partiiton,不一样partition可位于不一样节点。同时partition在物理上对应一个本地文件夹,每一个partition包含一个or多个segment,每一个segment包含一个数据文件和一个与之对应的索引文件。在逻辑上,能够把一个partition当作一个很是长的数组,可经过这个数组的索引(offset)去访问其数据
    • 关于并行:
      • 一方面,因为不一样的partition可位于不一样的机器,所以能够充分利用集群优点,实现机器间的并行处理。
      • 另外一方面,因为partition在物理上对应一个文件夹,即便多个partition位于同一个节点,也能够经过配置让同一个节点的不一样partition置于不一样的disk driver上,从而实现磁盘间的并行处理,充分发挥多磁盘的优点。
    • partition是最小并发粒度
  • ISR实现可用性与数据一致性的动态平衡:考虑CAP理论。
    • 常见数据复制及一致性方案
      • Master-slave:
        • RDBMS的读写分离即为典型的master-slave方案
        • 同步复制可保证强一致性但会影响可用性
        • 异步复制可提供高可用但会下降一致性
      • WNR
        • 主要用于去中心化的分布式系统中。
        • N表明副本总数,W表明每次写操做要保证的最少写成功副本数,R表明每次读至少要读取的副本数。
        • 当W + R > N 时,可保证每次读取的数据至少有一个副本拥有最新的数据
        • 多个写操做的顺序难以保证,可能致使多副本间的写操做顺序不一致。
      • Paxos及其变种
        • Google的Chubby,zk的Zab,Raft等
      • 基于ISR的数据复制方案 (kafka)
        • 既非彻底的同步复制,也不是彻底的异步复制,而是基于ISR的动态复制方案
        • ISR是由Leader动态维护的。若是follower不能紧跟上leader,它将被leader从ISR中移除,直到从新跟上后再次被加入ISR。
        • ISR会在每次改变时持久化到zk中。
        • 如何判断是否跟上?
          • 对0.8.*版本,若是follower在 replica.lag.time.max.ms 时间内未向leader发送fetch请求,则会被移除。而即便某follower持续向leader发送fetch请求,follower与leader的数据差距在replcia.lag.max.messages以上,也会被移除。 
          • 从0.9.0.0开始,replcia.lag.max.messages被移除。同时leader会考虑follower是否在时间内与之保持一致。
        • 使用ISR的缘由
          • 与同步复制相比,可避免最慢的follower拖慢总体速度,提升了系统可用性
          • ISR中全部的follower都包含了全部commit过的消息,而只有Commit过的消息才会被consumer消费,故从consume扔的角度而言,ISR中的全部replica都始终处于同步状态,从而与异步复制方案相比提升了数据一致性。
          • ISR可动态调整,极限状况下,能够只包含leader,极大提升了可容忍的宕机的follower个数。

具体实现层面

  • 高效使用磁盘:顺序写磁盘
    • 设计上,partition至关于一个很是长的数组,consumer经过offset顺序消费这些数据,而且不删除已经消费的数据。
    • 删除过程,并不是使用"读-写"模式去修改文件,而是将partition分为多个segment,每一个segment对应一个物理文件,经过删除整个文件的方式去删除partition内的数据。
  • 充分使用Page Cache
    • 好处以下
      • I/O Scheduler 会将连续的小块写组装成大块的物理写从而提升性能
      • I/O Scheduler 会尝试将一些写操做从新按顺序排好,从而减小磁盘头的移动时间
      • 充分利用全部空闲内存(非JVM内存)。若是使用应用层Cache(即JVM堆内存),会增长GC负担
      • 读操做可直接在Page Cache内进行。若是消费和生产速度至关,甚至不须要经过物理磁盘(直接经过Page Cache)交换数据
      • 若是进程重启,JVM内的Cache会失效,但Page Cache仍然可用
  • 支持多disk drive
    • broker的 log.dirs 配置项,容许配置多个文件夹。若是机器上有多个disk drive,可将不一样的disk挂载到不一样目录。
  • 零拷贝
    • kafka中存在大量的网络数据持久化到磁盘 和 磁盘文件经过网络发送的过程。这一过程的性能直接影响kafka的总体吞吐量。
  • 减小网络开销

 

Reference

  1. 系列文章
  2. 美团-kafka文件存储那些事
  3. kafka exactly-once
  4. kafka+SparkStreaming
  5. kafka-streaming integration
相关文章
相关标签/搜索