揭秘Kafka高性能架构之道 - Kafka设计解析(六)

原创文章,同步首发自做者我的博客。转载请务必在文章开头处以超连接形式注明出处http://www.jasongj.com/kafka/high_throughput/java

摘要

上一篇文章《Kafka设计解析(五)- Kafka性能测试方法及Benchmark报告》从测试角度说明了Kafka的性能。本文从宏观架构层面和具体实现层面分析了Kafka如何实现高性能。react

宏观架构层面

利用Partition实现并行处理

Partition提供并行处理的能力

Kafka是一个Pub-Sub的消息系统,不管是发布仍是订阅,都须指定Topic。如《Kafka设计解析(一)- Kafka背景及架构介绍》一文所述,Topic只是一个逻辑的概念。每一个Topic都包含一个或多个Partition,不一样Partition可位于不一样节点。同时Partition在物理上对应一个本地文件夹,每一个Partition包含一个或多个Segment,每一个Segment包含一个数据文件和一个与之对应的索引文件。在逻辑上,能够把一个Partition看成一个很是长的数组,可经过这个“数组”的索引(offset)去访问其数据。数组

一方面,因为不一样Partition可位于不一样机器,所以能够充分利用集群优点,实现机器间的并行处理。另外一方面,因为Partition在物理上对应一个文件夹,即便多个Partition位于同一个节点,也可经过配置让同一节点上的不一样Partition置于不一样的disk drive上,从而实现磁盘间的并行处理,充分发挥多磁盘的优点。网络

利用多磁盘的具体方法是,将不一样磁盘mount到不一样目录,而后在server.properties中,将log.dirs设置为多目录(用逗号分隔)。Kafka会自动将全部Partition尽量均匀分配到不一样目录也即不一样目录(也即不一样disk)上。架构

注:虽然物理上最小单位是Segment,但Kafka并不提供同一Partition内不一样Segment间的并行处理。由于对于写而言,每次只会写Partition内的一个Segment,而对于读而言,也只会顺序读取同一Partition内的不一样Segment。并发

Partition是最小并发粒度

如同《Kafka设计解析(四)- Kafka Consumer设计解析》一文所述,多Consumer消费同一个Topic时,同一条消息只会被同一Consumer Group内的一个Consumer所消费。而数据并不是按消息为单位分配,而是以Partition为单位分配,也即同一个Partition的数据只会被一个Consumer所消费(在不考虑Rebalance的前提下)。异步

若是Consumer的个数多于Partition的个数,那么会有部分Consumer没法消费该Topic的任何数据,也即当Consumer个数超过Partition后,增长Consumer并不能增长并行度。socket

简而言之,Partition个数决定了可能的最大并行度。以下图所示,因为Topic 2只包含3个Partition,故group2中的Consumer 三、Consumer 四、Consumer 5 可分别消费1个Partition的数据,而Consumer 6消费不到Topic 2的任何数据。
Kafka Consumer分布式

以Spark消费Kafka数据为例,若是所消费的Topic的Partition数为N,则有效的Spark最大并行度也为N。即便将Spark的Executor数设置为N+M,最多也只有N个Executor可同时处理该Topic的数据。ide

ISR实现可用性与数据一致性的动态平衡

CAP理论

CAP理论是指,分布式系统中,一致性、可用性和分区容忍性最多只能同时知足两个。

一致性

  • 经过某个节点的写操做结果对后面经过其它节点的读操做可见
  • 若是更新数据后,并发访问状况下后续读操做可当即感知该更新,称为强一致性
  • 若是容许以后部分或者所有感知不到该更新,称为弱一致性
  • 若在以后的一段时间(一般该时间不固定)后,必定能够感知到该更新,称为最终一致性

可用性

  • 任何一个没有发生故障的节点必须在有限的时间内返回合理的结果

分区容忍性

  • 部分节点宕机或者没法与其它节点通讯时,各分区间还可保持分布式系统的功能

通常而言,都要求保证分区容忍性。因此在CAP理论下,更多的是须要在可用性和一致性之间作权衡。

经常使用数据复制及一致性方案

Master-Slave

  • RDBMS的读写分离即为典型的Master-Slave方案
  • 同步复制可保证强一致性但会影响可用性
  • 异步复制可提供高可用性但会下降一致性

WNR

  • 主要用于去中心化的分布式系统中。DynamoDB与Cassandra即采用此方案或其变种
  • N表明总副本数,W表明每次写操做要保证的最少写成功的副本数,R表明每次读至少要读取的副本数
  • 当W+R>N时,可保证每次读取的数据至少有一个副本拥有最新的数据
  • 多个写操做的顺序难以保证,可能致使多副本间的写操做顺序不一致。Dynamo经过向量时钟保证最终一致性

Paxos及其变种

  • Google的Chubby,Zookeeper的原子广播协议(Zab),RAFT等

基于ISR的数据复制方案
如《Kafka High Availability(上)》一文所述,Kafka的数据复制是以Partition为单位的。而多个备份间的数据复制,经过Follower向Leader拉取数据完成。从一这点来说,Kafka的数据复制方案接近于上文所讲的Master-Slave方案。不一样的是,Kafka既不是彻底的同步复制,也不是彻底的异步复制,而是基于ISR的动态复制方案。

ISR,也即In-sync Replica。每一个Partition的Leader都会维护这样一个列表,该列表中,包含了全部与之同步的Replica(包含Leader本身)。每次数据写入时,只有ISR中的全部Replica都复制完,Leader才会将其置为Commit,它才能被Consumer所消费。

这种方案,与同步复制很是接近。但不一样的是,这个ISR是由Leader动态维护的。若是Follower不能紧“跟上”Leader,它将被Leader从ISR中移除,待它又从新“跟上”Leader后,会被Leader再次加加ISR中。每次改变ISR后,Leader都会将最新的ISR持久化到Zookeeper中。

至于如何判断某个Follower是否“跟上”Leader,不一样版本的Kafka的策略稍微有些区别。

  • 对于0.8.*版本,若是Follower在replica.lag.time.max.ms时间内未向Leader发送Fetch请求(也即数据复制请求),则Leader会将其从ISR中移除。若是某Follower持续向Leader发送Fetch请求,可是它与Leader的数据差距在replica.lag.max.messages以上,也会被Leader从ISR中移除。
  • 从0.9.0.0版本开始,replica.lag.max.messages被移除,故Leader再也不考虑Follower落后的消息条数。另外,Leader不只会判断Follower是否在replica.lag.time.max.ms时间内向其发送Fetch请求,同时还会考虑Follower是否在该时间内与之保持同步。
  • 0.10.* 版本的策略与0.9.*版一致

对于0.8.*版本的replica.lag.max.messages参数,不少读者曾留言提问,既然只有ISR中的全部Replica复制完后的消息才被认为Commit,那为什么会出现Follower与Leader差距过大的状况。缘由在于,Leader并不须要等到前一条消息被Commit才接收后一条消息。事实上,Leader能够按顺序接收大量消息,最新的一条消息的Offset被记为High Wartermark。而只有被ISR中全部Follower都复制过去的消息才会被Commit,Consumer只能消费被Commit的消息。因为Follower的复制是严格按顺序的,因此被Commit的消息以前的消息确定也已经被Commit过。换句话说,High Watermark标记的是Leader所保存的最新消息的offset,而Commit Offset标记的是最新的可被消费的(已同步到ISR中的Follower)消息。而Leader对数据的接收与Follower对数据的复制是异步进行的,所以会出现Commit Offset与High Watermark存在必定差距的状况。0.8.*版本中replica.lag.max.messages限定了Leader容许的该差距的最大值。

Kafka基于ISR的数据复制方案原理以下图所示。
Kafka Replication

如上图所示,在第一步中,Leader A总共收到3条消息,故其high watermark为3,但因为ISR中的Follower只同步了第1条消息(m1),故只有m1被Commit,也即只有m1可被Consumer消费。此时Follower B与Leader A的差距是1,而Follower C与Leader A的差距是2,均未超过默认的replica.lag.max.messages,故得以保留在ISR中。在第二步中,因为旧的Leader A宕机,新的Leader B在replica.lag.time.max.ms时间内未收到来自A的Fetch请求,故将A从ISR中移除,此时ISR={B,C}。同时,因为此时新的Leader B中只有2条消息,并未包含m3(m3从未被任何Leader所Commit),因此m3没法被Consumer消费。第四步中,Follower A恢复正常,它先将宕机前未Commit的全部消息所有删除,而后从最后Commit过的消息的下一条消息开始追赶新的Leader B,直到它“遇上”新的Leader,才被从新加入新的ISR中。

使用ISR方案的缘由

  • 因为Leader可移除不能及时与之同步的Follower,故与同步复制相比可避免最慢的Follower拖慢总体速度,也即ISR提升了系统可用性。
  • ISR中的全部Follower都包含了全部Commit过的消息,而只有Commit过的消息才会被Consumer消费,故从Consumer的角度而言,ISR中的全部Replica都始终处于同步状态,从而与异步复制方案相比提升了数据一致性。
  • ISR可动态调整,极限状况下,能够只包含Leader,极大提升了可容忍的宕机的Follower的数量。与Majority Quorum方案相比,容忍相同个数的节点失败,所要求的总节点数少了近一半。

ISR相关配置说明

  • Broker的min.insync.replicas参数指定了Broker所要求的ISR最小长度,默认值为1。也即极限状况下ISR能够只包含Leader。但此时若是Leader宕机,则该Partition不可用,可用性得不到保证。
  • 只有被ISR中全部Replica同步的消息才被Commit,但Producer发布数据时,Leader并不须要ISR中的全部Replica同步该数据才确认收到数据。Producer能够经过acks参数指定最少须要多少个Replica确认收到该消息才视为该消息发送成功。acks的默认值是1,即Leader收到该消息后当即告诉Producer收到该消息,此时若是在ISR中的消息复制完该消息前Leader宕机,那该条消息会丢失。而若是将该值设置为0,则Producer发送完数据后,当即认为该数据发送成功,不做任何等待,而实际上该数据可能发送失败,而且Producer的Retry机制将不生效。更推荐的作法是,将acks设置为all或者-1,此时只有ISR中的全部Replica都收到该数据(也即该消息被Commit),Leader才会告诉Producer该消息发送成功,从而保证不会有未知的数据丢失。

具体实现层面

高效使用磁盘

顺序写磁盘

根据《一些场景下顺序写磁盘快于随机写内存》所述,将写磁盘的过程变为顺序写,可极大提升对磁盘的利用率。

Kafka的整个设计中,Partition至关于一个很是长的数组,而Broker接收到的全部消息顺序写入这个大数组中。同时Consumer经过Offset顺序消费这些数据,而且不删除已经消费的数据,从而避免了随机写磁盘的过程。

因为磁盘有限,不可能保存全部数据,实际上做为消息系统Kafka也不必保存全部数据,须要删除旧的数据。而这个删除过程,并不是经过使用“读-写”模式去修改文件,而是将Partition分为多个Segment,每一个Segment对应一个物理文件,经过删除整个文件的方式去删除Partition内的数据。这种方式清除旧数据的方式,也避免了对文件的随机写操做。

经过以下代码可知,Kafka删除Segment的方式,是直接删除Segment对应的整个log文件和整个index文件而非删除文件中的部份内容。

/**
   * Delete this log segment from the filesystem.
   *
   * @throws KafkaStorageException if the delete fails.
   */
  def delete() {
    val deletedLog = log.delete()
    val deletedIndex = index.delete()
    val deletedTimeIndex = timeIndex.delete()
    if(!deletedLog && log.file.exists)
      throw new KafkaStorageException("Delete of log " + log.file.getName + " failed.")
    if(!deletedIndex && index.file.exists)
      throw new KafkaStorageException("Delete of index " + index.file.getName + " failed.")
    if(!deletedTimeIndex && timeIndex.file.exists)
      throw new KafkaStorageException("Delete of time index " + timeIndex.file.getName + " failed.")
  }

充分利用Page Cache

使用Page Cache的好处以下

  • I/O Scheduler会将连续的小块写组装成大块的物理写从而提升性能
  • I/O Scheduler会尝试将一些写操做从新按顺序排好,从而减小磁盘头的移动时间
  • 充分利用全部空闲内存(非JVM内存)。若是使用应用层Cache(即JVM堆内存),会增长GC负担
  • 读操做可直接在Page Cache内进行。若是消费和生产速度至关,甚至不须要经过物理磁盘(直接经过Page Cache)交换数据
  • 若是进程重启,JVM内的Cache会失效,但Page Cache仍然可用

Broker收到数据后,写磁盘时只是将数据写入Page Cache,并不保证数据必定彻底写入磁盘。从这一点看,可能会形成机器宕机时,Page Cache内的数据未写入磁盘从而形成数据丢失。可是这种丢失只发生在机器断电等形成操做系统不工做的场景,而这种场景彻底能够由Kafka层面的Replication机制去解决。若是为了保证这种状况下数据不丢失而强制将Page Cache中的数据Flush到磁盘,反而会下降性能。也正因如此,Kafka虽然提供了flush.messagesflush.ms两个参数将Page Cache中的数据强制Flush到磁盘,可是Kafka并不建议使用。

若是数据消费速度与生产速度至关,甚至不须要经过物理磁盘交换数据,而是直接经过Page Cache交换数据。同时,Follower从Leader Fetch数据时,也可经过Page Cache完成。下图为某Partition的Leader节点的网络/磁盘读写信息。

Kafka I/O page cache

从上图能够看到,该Broker每秒经过网络从Producer接收约35MB数据,虽然有Follower从该Broker Fetch数据,可是该Broker基本无读磁盘。这是由于该Broker直接从Page Cache中将数据取出返回给了Follower。

支持多Disk Drive

Broker的log.dirs配置项,容许配置多个文件夹。若是机器上有多个Disk Drive,可将不一样的Disk挂载到不一样的目录,而后将这些目录都配置到log.dirs里。Kafka会尽量将不一样的Partition分配到不一样的目录,也即不一样的Disk上,从而充分利用了多Disk的优点。

零拷贝

Kafka中存在大量的网络数据持久化到磁盘(Producer到Broker)和磁盘文件经过网络发送(Broker到Consumer)的过程。这一过程的性能直接影响Kafka的总体吞吐量。

传统模式下的四次拷贝与四次上下文切换

以将磁盘文件经过网络发送为例。传统模式下,通常使用以下伪代码所示的方法先将文件数据读入内存,而后经过Socket将内存中的数据发送出去。

buffer = File.read
Socket.send(buffer)

这一过程实际上发生了四次数据拷贝。首先经过系统调用将文件数据读入到内核态Buffer(DMA拷贝),而后应用程序将内存态Buffer数据读入到用户态Buffer(CPU拷贝),接着用户程序经过Socket发送数据时将用户态Buffer数据拷贝到内核态Buffer(CPU拷贝),最后经过DMA拷贝将数据拷贝到NIC Buffer。同时,还伴随着四次上下文切换,以下图所示。

BIO 四次拷贝 四次上下文切换

sendfile和transferTo实现零拷贝

Linux 2.4+内核经过sendfile系统调用,提供了零拷贝。数据经过DMA拷贝到内核态Buffer后,直接经过DMA拷贝到NIC Buffer,无需CPU拷贝。这也是零拷贝这一说法的来源。除了减小数据拷贝外,由于整个读文件-网络发送由一个sendfile调用完成,整个过程只有两次上下文切换,所以大大提升了性能。零拷贝过程以下图所示。

BIO 零拷贝 两次上下文切换

从具体实现来看,Kafka的数据传输经过TransportLayer来完成,其子类PlaintextTransportLayer经过Java NIO的FileChannel的transferTotransferFrom方法实现零拷贝,以下所示。

@Override
    public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
        return fileChannel.transferTo(position, count, socketChannel);
    }

注: transferTotransferFrom并不保证必定能使用零拷贝。其实是否能使用零拷贝与操做系统相关,若是操做系统提供sendfile这样的零拷贝系统调用,则这两个方法会经过这样的系统调用充分利用零拷贝的优点,不然并不能经过这两个方法自己实现零拷贝。

减小网络开销

批处理

批处理是一种经常使用的用于提升I/O性能的方式。对Kafka而言,批处理既减小了网络传输的Overhead,又提升了写磁盘的效率。

Kafka 0.8.1及之前的Producer区分同步Producer和异步Producer。同步Producer的send方法主要分两种形式。一种是接受一个KeyedMessage做为参数,一次发送一条消息。另外一种是接受一批KeyedMessage做为参数,一次性发送多条消息。而对于异步发送而言,不管是使用哪一个send方法,实现上都不会当即将消息发送给Broker,而是先存到内部的队列中,直到消息条数达到阈值或者达到指定的Timeout才真正的将消息发送出去,从而实现了消息的批量发送。

Kafka 0.8.2开始支持新的Producer API,将同步Producer和异步Producer结合。虽然从send接口来看,一次只能发送一个ProducerRecord,而不能像以前版本的send方法同样接受消息列表,可是send方法并不是当即将消息发送出去,而是经过batch.sizelinger.ms控制实际发送频率,从而实现批量发送。

因为每次网络传输,除了传输消息自己之外,还要传输很是多的网络协议自己的一些内容(称为Overhead),因此将多条消息合并到一块儿传输,可有效减小网络传输的Overhead,进而提升了传输效率。

零拷贝章节的图中能够看到,虽然Broker持续从网络接收数据,可是写磁盘并不是每秒都在发生,而是间隔一段时间写一次磁盘,而且每次写磁盘的数据量都很是大(最高达到718MB/S)。

数据压缩下降网络负载

Kafka从0.7开始,即支持将数据压缩后再传输给Broker。除了能够将每条消息单独压缩而后传输外,Kafka还支持在批量发送时,将整个Batch的消息一块儿压缩后传输。数据压缩的一个基本原理是,重复数据越多压缩效果越好。所以将整个Batch的数据一块儿压缩能更大幅度减少数据量,从而更大程度提升网络传输效率。

Broker接收消息后,并不直接解压缩,而是直接将消息以压缩后的形式持久化到磁盘。Consumer Fetch到数据后再解压缩。所以Kafka的压缩不只减小了Producer到Broker的网络传输负载,同时也下降了Broker磁盘操做的负载,也下降了Consumer与Broker间的网络传输量,从而极大得提升了传输效率,提升了吞吐量。

高效的序列化方式

Kafka消息的Key和Payload(或者说Value)的类型可自定义,只需同时提供相应的序列化器和反序列化器便可。所以用户能够经过使用快速且紧凑的序列化-反序列化方式(如Avro,Protocal Buffer)来减小实际网络传输和磁盘存储的数据规模,从而提升吞吐率。这里要注意,若是使用的序列化方法太慢,即便压缩比很是高,最终的效率也不必定高。

Kafka系列文章

相关文章
相关标签/搜索