__consumer_offsets:做用是保存 Kafka 消费者的位移信息
__transaction_state:用来存储事务日志消息apache
所谓的优先副本是指在AR集合列表中的第一个副本。
理想状况下,优先副本就是该分区的leader 副本,因此也能够称之为 preferred leader。Kafka 要确保全部主题的优先副本在 Kafka 集群中均匀分布,这样就保证了全部分区的 leader 均衡分布。以此来促进集群的负载均衡,这一行为也能够称为“分区平衡”。缓存
Kafka 中的消息是以主题为基本单位进行归类的,各个主题在逻辑上相互独立。每一个主题又能够分为一个或多个分区。不考虑多副本的状况,一个分区对应一个日志(Log)。为了防止 Log 过大,Kafka 又引入了日志分段(LogSegment)的概念,将 Log 切分为多个 LogSegment,至关于一个巨型文件被平均分配为多个相对较小的文件。安全
Log 和 LogSegment 也不是纯粹物理意义上的概念,Log 在物理上只以文件夹的形式存储,而每一个 LogSegment 对应于磁盘上的一个日志文件和两个索引文件,以及可能的其余文件(好比以“.txnindex”为后缀的事务索引文件)网络
每一个日志分段文件对应了两个索引文件,主要用来提升查找消息的效率。
偏移量索引文件用来创建消息偏移量(offset)到物理地址之间的映射关系,方便快速定位消息所在的物理文件位置
时间戳索引文件则根据指定的时间戳(timestamp)来查找对应的偏移量信息。负载均衡
Kafka是经过seek() 方法来指定消费的,在执行seek() 方法以前要去执行一次poll()方法,等到分配到分区以后会去对应的分区的指定位置开始消费,若是指定的位置发生了越界,那么会根据auto.offset.reset 参数设置的状况进行消费。性能
Kafka提供了一个 offsetsForTimes() 方法,经过 timestamp 来查询与此对应的分区位置。offsetsForTimes() 方法的参数 timestampsToSearch 是一个 Map 类型,key 为待查询的分区,而 value 为待查询的时间戳,该方法会返回时间戳大于等于待查询时间的第一条消息对应的位置和时间戳,对应于 OffsetAndTimestamp 中的 offset 和 timestamp 字段。操作系统
日志删除(Log Retention):按照必定的保留策略直接删除不符合条件的日志分段。
咱们能够经过 broker 端参数 log.cleanup.policy 来设置日志清理策略,此参数的默认值为“delete”,即采用日志删除的清理策略。线程
基于时间
日志删除任务会检查当前日志文件中是否有保留时间超过设定的阈值(retentionMs)来寻找可删除的日志分段文件集合(deletableSegments)retentionMs 能够经过 broker 端参数 log.retention.hours、log.retention.minutes 和 log.retention.ms 来配置,其中 log.retention.ms 的优先级最高,log.retention.minutes 次之,log.retention.hours 最低。默认状况下只配置了 log.retention.hours 参数,其值为168,故默认状况下日志分段文件的保留时间为7天。
删除日志分段时,首先会从 Log 对象中所维护日志分段的跳跃表中移除待删除的日志分段,以保证没有线程对这些日志分段进行读取操做。而后将日志分段所对应的全部文件添加上“.deleted”的后缀(固然也包括对应的索引文件)。最后交由一个以“delete-file”命名的延迟任务来删除这些以“.deleted”为后缀的文件,这个任务的延迟执行时间能够经过 file.delete.delay.ms 参数来调配,此参数的默认值为60000,即1分钟。scala
基于日志起始偏移量
基于日志起始偏移量的保留策略的判断依据是某日志分段的下一个日志分段的起始偏移量 baseOffset 是否小于等于 logStartOffset,如果,则能够删除此日志分段。
设计
如上图所示,假设 logStartOffset 等于25,日志分段1的起始偏移量为0,日志分段2的起始偏移量为11,日志分段3的起始偏移量为23,经过以下动做收集可删除的日志分段的文件集合 deletableSegments:
从头开始遍历每一个日志分段,日志分段1的下一个日志分段的起始偏移量为11,小于 logStartOffset 的大小,将日志分段1加入 deletableSegments。
日志分段2的下一个日志偏移量的起始偏移量为23,也小于 logStartOffset 的大小,将日志分段2加入 deletableSegments。
日志分段3的下一个日志偏移量在 logStartOffset 的右侧,故从日志分段3开始的全部日志分段都不会加入 deletableSegments。
收集完可删除的日志分段的文件集合以后的删除操做同基于日志大小的保留策略和基于时间的保留策略相同
日志压缩(Log Compaction):针对每一个消息的 key 进行整合,对于有相同 key 的不一样 value 值,只保留最后一个版本。
若是要采用日志压缩的清理策略,就须要将 log.cleanup.policy 设置为“compact”,而且还须要将 log.cleaner.enable (默认值为 true)设定为 true。
以下图所示,Log Compaction 对于有相同 key 的不一样 value 值,只保留最后一个版本。若是应用只关心 key 对应的最新 value 值,则能够开启 Kafka 的日志清理功能,Kafka 会按期将相同 key 的消息进行合并,只保留最新的 value 值。
页缓存是操做系统实现的一种主要的磁盘缓存,以此用来减小对磁盘 I/O 的操做。具体来讲,就是把磁盘中的数据缓存到内存中,把对磁盘的访问变为对内存的访问。
当一个进程准备读取磁盘上的文件内容时,操做系统会先查看待读取的数据所在的页(page)是否在页缓存(pagecache)中,若是存在(命中)则直接返回数据,从而避免了对物理磁盘的 I/O 操做;若是没有命中,则操做系统会向磁盘发起读取请求并将读取的数据页存入页缓存,以后再将数据返回给进程。
一样,若是一个进程须要将数据写入磁盘,那么操做系统也会检测数据对应的页是否在页缓存中,若是不存在,则会先在页缓存中添加相应的页,最后将数据写入对应的页。被修改事后的页也就变成了脏页,操做系统会在合适的时间把脏页中的数据写入磁盘,以保持数据的一致性。
用过 Java 的人通常都知道两点事实:对象的内存开销很是大,一般会是真实数据大小的几倍甚至更多,空间使用率低下;Java 的垃圾回收会随着堆内数据的增多而变得愈来愈慢。基于这些因素,使用文件系统并依赖于页缓存的作法明显要优于维护一个进程内缓存或其余结构,至少咱们能够省去了一份进程内部的缓存消耗,同时还能够经过结构紧凑的字节码来替代使用对象的方式以节省更多的空间。
此外,即便 Kafka 服务重启,页缓存仍是会保持有效,然而进程内的缓存却须要重建。这样也极大地简化了代码逻辑,由于维护页缓存和文件之间的一致性交由操做系统来负责,这样会比进程内维护更加安全有效。
除了消息顺序追加、页缓存等技术,Kafka 还使用零拷贝(Zero-Copy)技术来进一步提高性能。所谓的零拷贝是指将数据直接从磁盘文件复制到网卡设备中,而不须要经由应用程序之手。零拷贝大大提升了应用程序的性能,减小了内核和用户模式之间的上下文切换。对 Linux 操做系统而言,零拷贝技术依赖于底层的 sendfile() 方法实现。对应于 Java 语言,FileChannal.transferTo() 方法的底层实现就是 sendfile() 方法。
Kafka 中有多种延时操做,好比延时生产,还有延时拉取(DelayedFetch)、延时数据删除(DelayedDeleteRecords)等。
延时操做建立以后会被加入延时操做管理器(DelayedOperationPurgatory)来作专门的处理。延时操做有可能会超时,每一个延时操做管理器都会配备一个定时器(SystemTimer)来作超时管理,定时器的底层就是采用时间轮(TimingWheel)实现的。
在 Kafka 集群中会有一个或多个 broker,其中有一个 broker 会被选举为控制器(Kafka Controller),它负责管理整个集群中全部分区和副本的状态。当某个分区的 leader 副本出现故障时,由控制器负责为该分区选举新的 leader 副本。当检测到某个分区的 ISR 集合发生变化时,由控制器负责通知全部broker更新其元数据信息。当使用 kafka-topics.sh 脚本为某个 topic 增长分区数量时,一样仍是由控制器负责分区的从新分配。
如上图,旧版消费者客户端每一个消费组(
每一个消费者在启动时都会在 /consumers/
这种方式下每一个消费者对 ZooKeeper 的相关路径分别进行监听,当触发再均衡操做时,一个消费组下的全部消费者会同时进行再均衡操做,而消费者之间并不知道彼此操做的结果,这样可能致使 Kafka 工做在一个不正确的状态。与此同时,这种严重依赖于 ZooKeeper 集群的作法还有两个比较严重的问题。
就目前而言,一共有以下几种情形会触发再均衡的操做:
GroupCoordinator 是 Kafka 服务端中用于管理消费组的组件。而消费者客户端中的 ConsumerCoordinator 组件负责与 GroupCoordinator 进行交互。
消费者须要肯定它所属的消费组对应的 GroupCoordinator 所在的 broker,并建立与该 broker 相互通讯的网络链接。若是消费者已经保存了与消费组对应的 GroupCoordinator 节点的信息,而且与它之间的网络链接是正常的,那么就能够进入第二阶段。不然,就须要向集群中的某个节点发送 FindCoordinatorRequest 请求来查找对应的 GroupCoordinator,这里的“某个节点”并不是是集群中的任意节点,而是负载最小的节点。
在成功找到消费组所对应的 GroupCoordinator 以后就进入加入消费组的阶段,在此阶段的消费者会向 GroupCoordinator 发送 JoinGroupRequest 请求,并处理响应。
选举消费组的leader
若是消费组内尚未 leader,那么第一个加入消费组的消费者即为消费组的 leader。若是某一时刻 leader 消费者因为某些缘由退出了消费组,那么会从新选举一个新的 leader
选举分区分配策略
leader 消费者根据在第二阶段中选举出来的分区分配策略来实施具体的分区分配,在此以后须要将分配的方案同步给各个消费者,经过 GroupCoordinator 这个“中间人”来负责转发同步分配方案的。
进入这个阶段以后,消费组中的全部消费者就会处于正常工做状态。在正式消费以前,消费者还须要肯定拉取消息的起始位置。假设以前已经将最后的消费位移提交到了 GroupCoordinator,而且 GroupCoordinator 将其保存到了 Kafka 内部的 __consumer_offsets 主题中,此时消费者能够经过 OffsetFetchRequest 请求获取上次提交的消费位移并今后处继续消费。
消费者经过向 GroupCoordinator 发送心跳来维持它们与消费组的从属关系,以及它们对分区的全部权关系。只要消费者以正常的时间间隔发送心跳,就被认为是活跃的,说明它还在读取分区中的消息。心跳线程是一个独立的线程,能够在轮询消息的空档发送心跳。若是消费者中止发送心跳的时间足够长,则整个会话就被断定为过时,GroupCoordinator 也会认为这个消费者已经死亡,就会触发一次再均衡行为。
为了实现生产者的幂等性,Kafka 为此引入了 producer id(如下简称 PID)和序列号(sequence number)这两个概念。
每一个新的生产者实例在初始化的时候都会被分配一个 PID,这个 PID 对用户而言是彻底透明的。对于每一个 PID,消息发送到的每个分区都有对应的序列号,这些序列号从0开始单调递增。生产者每发送一条消息就会将 <PID,分区> 对应的序列号的值加1。
broker 端会在内存中为每一对 <PID,分区> 维护一个序列号。对于收到的每一条消息,只有当它的序列号的值(SN_new)比 broker 端中维护的对应的序列号的值(SN_old)大1(即 SN_new = SN_old + 1)时,broker 才会接收它。若是 SN_new< SN_old + 1,那么说明消息被重复写入,broker 能够直接将其丢弃。若是 SN_new> SN_old + 1,那么说明中间有数据还没有写入,出现了乱序,暗示可能有消息丢失,对应的生产者会抛出 OutOfOrderSequenceException,这个异常是一个严重的异常,后续的诸如 send()、beginTransaction()、commitTransaction() 等方法的调用都会抛出 IllegalStateException 的异常。