深刻理解 Kafka 副本机制

1、Kafka集群

Kafka使用Zookeeper来维护集群成员(brokers)的信息。每一个broker都有一个惟一标识broker.id,用于标识本身在集群中的身份,能够在配置文件server.properties中进行配置,或者由程序自动生成。下面是Kafka brokers集群自动建立的过程:java

  • 每个broker启动的时候,它会在Zookeeper的/brokers/ids路径下建立一个临时节点,并将本身的broker.id写入,从而将自身注册到集群;
  • 当有多个broker时,全部broker会竞争性地在Zookeeper上建立/controller节点,因为Zookeeper上的节点不会重复,因此必然只会有一个broker建立成功,此时该broker称为controller broker。它除了具有其余broker的功能外,还负责管理主题分区及其副本的状态
  • 当broker出现宕机或者主动退出从而致使其持有的Zookeeper会话超时时,会触发注册在Zookeeper上的watcher事件,此时Kafka会进行相应的容错处理;若是宕机的是controller broker时,还会触发新的controller选举。

2、副本机制

为了保证高可用,kafka的分区是多副本的,若是一个副本丢失了,那么还能够从其余副本中获取分区数据。可是这要求对应副本的数据必须是完整的,这是Kafka数据一致性的基础,因此才须要使用controller broker来进行专门的管理。下面将详解介绍Kafka的副本机制。git

2.1 分区和副本

Kafka 的主题被分为多个分区 ,分区是Kafka最基本的存储单位。每一个分区能够有多个副本(能够在建立主题时使用replication-factor参数进行指定)。其中一个副本是首领副本(Leader replica),全部的事件都直接发送给首领副本;其余副本是跟随者副本(Follower replica),须要经过复制来保持与首领副本数据一致,当首领副本不可用时,其中一个跟随者副本将成为新首领。github

2.2 ISR机制

每一个分区都有一个ISR(in-sync Replica)列表,用于维护全部同步的、可用的副本。首领副本必然是同步副本,而对于跟随者副原本说,它须要知足如下条件才能被认为是同步副本:apache

  • 与Zookeeper之间有一个活跃的会话,即必须定时向Zookeeper发送心跳;
  • 在规定的时间内从首领副本那里低延迟地获取过消息。

若是副本不知足上面条件的话,就会被从ISR列表中移除,直到知足条件才会被再次加入。api

这里给出一个主题建立的示例:使用--replication-factor指定副本系数为3,建立成功后使用--describe命令能够看到分区0的有0,1,2三个副本,且三个副本都在ISR列表中,其中1为首领副本。缓存

2.3 不彻底的首领选举

对于副本机制,在broker级别有一个可选的配置参数unclean.leader.election.enable,默认值为fasle,表明禁止不彻底的首领选举。这是针对当首领副本挂掉且ISR中没有其余可用副本时,是否容许某个不彻底同步的副本成为首领副本,这可能会致使数据丢失或者数据不一致,在某些对数据一致性要求较高的场景(如金融领域),这可能没法容忍的,因此其默认值为false,若是你可以容许部分数据不一致的话,能够配置为true。服务器

2.4 最少同步副本

ISR机制的另一个相关参数是min.insync.replicas , 能够在broker或者主题级别进行配置,表明ISR列表中至少要有几个可用副本。这里假设设置为2,那么当可用副本数量小于该值时,就认为整个分区处于不可用状态。此时客户端再向分区写入数据时候就会抛出异常org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。网络

2.5 发送确认

Kafka在生产者上有一个可选的参数ack,该参数指定了必需要有多少个分区副本收到消息,生产者才会认为消息写入成功:架构

  • acks=0 :消息发送出去就认为已经成功了,不会等待任何来自服务器的响应;
  • acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应;
  • acks=all :只有当全部参与复制的节点所有收到消息时,生产者才会收到一个来自服务器的成功响应。

3、数据请求

3.1 元数据请求机制

在全部副本中,只有领导副本才能进行消息的读写处理。因为不一样分区的领导副本可能在不一样的broker上,若是某个broker收到了一个分区请求,可是该分区的领导副本并不在该broker上,那么它就会向客户端返回一个Not a Leader for Partition的错误响应。 为了解决这个问题,Kafka提供了元数据请求机制。socket

首先集群中的每一个broker都会缓存全部主题的分区副本信息,客户端会按期发送发送元数据请求,而后将获取的元数据进行缓存。定时刷新元数据的时间间隔能够经过为客户端配置metadata.max.age.ms来进行指定。有了元数据信息后,客户端就知道了领导副本所在的broker,以后直接将读写请求发送给对应的broker便可。

若是在定时请求的时间间隔内发生的分区副本的选举,则意味着原来缓存的信息可能已通过时了,此时还有可能会收到Not a Leader for Partition的错误响应,这种状况下客户端会再次求发出元数据请求,而后刷新本地缓存,以后再去正确的broker上执行对应的操做,过程以下图:

3.2 数据可见性

须要注意的是,并非全部保存在分区首领上的数据均可以被客户端读取到,为了保证数据一致性,只有被全部同步副本(ISR中全部副本)都保存了的数据才能被客户端读取到。

3.3 零拷贝

Kafka全部数据的写入和读取都是经过零拷贝来实现的。传统拷贝与零拷贝的区别以下:

传统模式下的四次拷贝与四次上下文切换

以将磁盘文件经过网络发送为例。传统模式下,通常使用以下伪代码所示的方法先将文件数据读入内存,而后经过Socket将内存中的数据发送出去。

buffer = File.read
Socket.send(buffer)

这一过程实际上发生了四次数据拷贝。首先经过系统调用将文件数据读入到内核态Buffer(DMA拷贝),而后应用程序将内存态Buffer数据读入到用户态Buffer(CPU拷贝),接着用户程序经过Socket发送数据时将用户态Buffer数据拷贝到内核态Buffer(CPU拷贝),最后经过DMA拷贝将数据拷贝到NIC Buffer。同时,还伴随着四次上下文切换,以下图所示:

sendfile和transferTo实现零拷贝

Linux 2.4+内核经过sendfile系统调用,提供了零拷贝。数据经过DMA拷贝到内核态Buffer后,直接经过DMA拷贝到NIC Buffer,无需CPU拷贝。这也是零拷贝这一说法的来源。除了减小数据拷贝外,由于整个读文件到网络发送由一个sendfile调用完成,整个过程只有两次上下文切换,所以大大提升了性能。零拷贝过程以下图所示:

从具体实现来看,Kafka的数据传输经过TransportLayer来完成,其子类PlaintextTransportLayertransferFrom方法经过调用Java NIO中FileChannel的transferTo方法实现零拷贝,以下所示:

@Override
public long transferFrom(FileChannel fileChannel, long position, long count) throws IOException {
    return fileChannel.transferTo(position, count, socketChannel);
}

注: transferTotransferFrom并不保证必定能使用零拷贝。其实是否能使用零拷贝与操做系统相关,若是操做系统提供sendfile这样的零拷贝系统调用,则这两个方法会经过这样的系统调用充分利用零拷贝的优点,不然并不能经过这两个方法自己实现零拷贝。

4、物理存储

4.1 分区分配

在建立主题时,Kafka会首先决定如何在broker间分配分区副本,它遵循如下原则:

  • 在全部broker上均匀地分配分区副本;
  • 确保分区的每一个副本分布在不一样的broker上;
  • 若是使用了broker.rack参数为broker指定了机架信息,那么会尽量的把每一个分区的副本分配到不一样机架的broker上,以免一个机架不可用而致使整个分区不可用。

基于以上缘由,若是你在一个单节点上建立一个3副本的主题,一般会抛出下面的异常:

Error while executing topic command : org.apache.kafka.common.errors.InvalidReplicationFactor   
Exception: Replication factor: 3 larger than available brokers: 1.

4.2 分区数据保留规则

保留数据是 Kafka 的一个基本特性, 可是Kafka不会一直保留数据,也不会等到全部消费者都读取了消息以后才删除消息。相反, Kafka为每一个主题配置了数据保留期限,规定数据被删除以前能够保留多长时间,或者清理数据以前能够保留的数据量大小。分别对应如下四个参数:

  • log.retention.bytes :删除数据前容许的最大数据量;默认值-1,表明没有限制;
  • log.retention.ms:保存数据文件的毫秒数,若是未设置,则使用log.retention.minutes中的值,默认为null;
  • log.retention.minutes:保留数据文件的分钟数,若是未设置,则使用log.retention.hours中的值,默认为null;
  • log.retention.hours:保留数据文件的小时数,默认值为168,也就是一周。

由于在一个大文件里查找和删除消息是很费时的,也很容易出错,因此Kafka把分区分红若干个片断,当前正在写入数据的片断叫做活跃片断。活动片断永远不会被删除。若是按照默认值保留数据一周,并且天天使用一个新片断,那么你就会看到,在天天使用一个新片断的同时会删除一个最老的片断,因此大部分时间该分区会有7个片断存在。

4.3 文件格式

一般保存在磁盘上的数据格式与生产者发送过来消息格式是同样的。 若是生产者发送的是压缩过的消息,那么同一个批次的消息会被压缩在一块儿,被看成“包装消息”进行发送(格式以下所示) ,而后保存到磁盘上。以后消费者读取后再本身解压这个包装消息,获取每条消息的具体信息。

参考资料

  1. Neha Narkhede, Gwen Shapira ,Todd Palino(著) , 薛命灯(译) . Kafka权威指南 . 人民邮电出版社 . 2017-12-26
  2. Kafka高性能架构之道

更多大数据系列文章能够参见我的 GitHub 开源项目: 大数据入门指南

相关文章
相关标签/搜索