kafka是你们比较经常使用的消息中间件,本文主要介绍kafka基本组件及其相关原理网络
基本架构
- Broker:消息中间件处理节点,一个Kafka节点就是一个broker,一个或者多个Broker能够组成一个Kafka集群
- Topic:Kafka根据topic对消息进行归类,发布到Kafka集群的每条消息都须要指定一个topic
- Producer:消息生产者,向Broker发送消息的客户端
- Consumer:消息消费者,从Broker读取消息的客户端
- ConsumerGroup:每一个Consumer属于一个特定的Consumer Group,一条消息能够发送到多个不一样的Consumer Group,可是一个Consumer Group中只能有一个Consumer可以消费该消息
- Partition:物理上的概念,一个topic能够分为多个partition,每一个partition内部是有序的
偏移量
Kafka经过offset保证消息在分区内的顺序,offset的顺序性不跨分区 Kafka0.10之后,使用一个专门的topic __consumer_offset保存offset __consumer_offset日志留存方式为compact,也就是说,该topic会对key相同的消息进行整理架构
__consumer_offset内保存三类消息:
- Consumer group组元数据消息
- Consumer group位移消息
- Tombstone消息
kafka log
存储
每一个Partition其实都会对应一个日志目录:{topicName}-{partitionid}/,在目录下面会对应多个日志分段(LogSegment)。LogSegment文件由两部分组成,分别为“.index”文件和“.log”文件 app
索引文件使用稀疏索引的方式,避免对日志每条数据建索引,节省存储空间
发送
使用page cache顺序读文件,操做系统能够预读数据到 page cache 使用mmap直接将日志文件映射到虚拟地址空间 操作系统
read()是系统调用,首先将文件从硬盘拷贝到内核空间的一个缓冲区,再将这些数据拷贝到用户空间,实际上进行了两次数据拷贝; mmap()也是系统调用,但没有进行数据拷贝,当缺页中断发生时,直接将文件从硬盘拷贝到用户空间,只进行了一次数据拷贝。 Java中使用MappedByteBuffer封装了mmap
零拷贝:消息数据直接从 page cache 发送到网络 一般的文件读取须要经历下图的流程,有两次用户态与内核态之间内存的拷贝 3d
kafka使用零拷贝,避免消息在内核态和用户态间的来回拷贝
副本
-
每个分区都存在一个ISR(in-sync replicas)日志
-
ISR集合中的每个副本都与leader保持同步状态,不在里面的保持不了同步状态cdn
-
只有ISR中的副本才有资格被选为leader中间件
-
Producer写入的消息只有被ISR中的副本都接收到,才被视为"已提交" blog
-
Log End Offset:Producer 写入到 Kafka 中的最新一条数据的 offset索引
-
High Watermark:已经成功备份到其余 replicas 中的最新一条数据的 offset,也就是说 Log End Offset 与 High Watermark 之间的数据已经写入到该 partition 的 leader 中,可是还未成功备份到其余的 replicas 中
副本同步流程:
Controller
Controller相似于集群的master,主要管理以下几块:
- Broker 的上线、下线处理
- topic 的分区扩容,处理分区副本的分配、leader 选举
Controller经过broker抢占zk临时节点选举出来,且controller与全部broker创建长链接
Controller管理partition leader选举,主要有如下几种方式:
选举方式 |
说明 |
OfflinePartitionLeaderSelector |
leader 掉线时触发 |
ReassignedPartitionLeaderSelector |
分区的副本从新分配数据同步完成后触发的 |
PreferredReplicaPartitionLeaderSelector |
最优 leader 选举,手动触发或自动 leader 均衡调度时触发 |
ControlledShutdownLeaderSelector |
broker 发送 ShutDown 请求主动关闭服务时触发 |
消息幂等
问题:
- 在 0.11.0 以前,producer保证at least once
- at least once可能带来重复数据 网络请求延迟等致使的重试操做,在发送请求重试时 Server 端并不知道这条请求是否已经处理(没有记录以前的状态信息),因此就会有可能致使数据请求的重复发送,这是 Kafka 自身的机制(异常时请求重试机制)致使的数据重复
解决方案:
- PID(Producer ID),用来标识每一个 producer client
- sequence numbers,client 发送的每条消息都会带相应的 sequence number,Server 端就是根据这个值来判断数据是否重复
Rebalance
kafka rebalance发生的5种状况:
- 有新的消费者加入Consumer Group。
- 有消费者宕机下线。消费者并不必定须要真正下线,例如遇到长时间的GC、网络延迟致使消费者长时间未向GroupCoordinator发送HeartbeatRequest时,GroupCoordinator会认为消费者下线。
- 有消费者主动退出Consumer Group。
- Consumer Group订阅的任一Topic出现分区数量的变化。
- 消费者调用unsubscrible()取消对某Topic的订阅。
kafka经过GroupCoordinator管理rebalance操做
- GroupCoordinator是KafkaServer中用于管理Consumer Group的组件
- GroupCoordinator在ZooKeeper上添加Watcher
- 获取GroupCoordinator:消费者会向Kafka集群中的任一Broker发送ConsumerMetadataRequest
- 消费者链接到GroupCoordinator并周期性地发送HeartbeatRequest
- 若是HeartbeatResponse中带有IllegalGeneration异常,说明GroupCoordinator发起了Rebalance操做,此时进入rebalance环节 Rebalance分为两个流程。
Join Group:
- Consumer首先向GroupCoordinator发送JoinGroupRequest请求,其中包含消费者的相关信息
- GroupCoordinator从中选取一个消费者成为Group Leader,封装成JoinGroupResponse返回给每一个消费者
- 只有Group Leader收到的JoinGroupResponse中封装了全部消费者的信息, Group Leader根据消费者的信息以及选定的分区分配策略进行分区分配。
Sync Group:
- 每一个消费者会发送SyncGroupRequest到GroupCoordinator,可是只有Group Leader的SyncGroupRequest请求包含了分区的分配结果
- GroupCoordinator根据Group Leader的分区分配结果,造成SyncGroupResponse返回给全部Consumer
- 消费者收到SyncGroupResponse后进行解析,便可获取分配给自身的partition