Kafka 依赖 Zookeeper 来维护集群成员的信息:node
每一个 broker 都有一个惟一标识符 ID,这个标识符能够在配置文件里指定,也能够自动生成。
在 broker 启动的时候,它经过在 Zookeeper 的 /brokers/ids
路径上建立临时节点,把本身的 ID 注册 Zookeeper。
Kafka 组件会订阅 Zookeeper 的 /brokers/ids
路径,当有 broker 加入集群或退出集群时,这些组件就能够得到通知。算法
在 broker 停机、出现网络分区或长时间垃圾回收停顿时,会致使其 Zookeeper 会话失效,致使其在启动时建立的临时节点会自动被移除。
监听 broker 列表的 Kafka 组件会被告知该 broker 已移除,而后处理 broker 崩溃的后续事宜。apache
在彻底关闭一个 broker 以后,若是使用相同的 ID 启动另外一个全新的 broker,它会当即加入集群,并拥有与旧 broker 相同的分区和主题。网络
controller 其实就是一个 broker,它除了具备通常 broker 的功能以外,还负责分区 leader 的选举。并发
为了在整个集群中指定一个惟一的 controller,broker 集群须要进行选举,该过程依赖如下两个 Zookeeper 节点:负载均衡
// 临时节点 controller(保存最新的 controller 节点信息,保证惟一性) object ControllerZNode { def path = "/controller" def encode(brokerId: Int, timestamp: Long): Array[Byte] = { Json.encodeAsBytes(Map("version" -> 1, "brokerid" -> brokerId, "timestamp" -> timestamp.toString).asJava) } def decode(bytes: Array[Byte]): Option[Int] = Json.parseBytes(bytes).map { js => js.asJsonObject("brokerid").to[Int] } } // 永久节点 controller_epoch(保存最新 controller 对应的任期号,用于避免脑裂) object ControllerEpochZNode { def path = "/controller_epoch" def encode(epoch: Int): Array[Byte] = epoch.toString.getBytes(UTF_8) def decode(bytes: Array[Byte]): Int = new String(bytes, UTF_8).toInt }
broker 启动后会发起一轮选举,选举经过 Zookeeper 提供的建立节点功能来实现:性能
/controller
,建立成功的 broker 将成为 controller。
/controller_epoch
中的任期号,其余 broker 能够根据任期号忽略已过时 controller 的消息。
Watcher
实时监控
/controller
节点。
broker 中的 KafkaController 对象负责发起选举:fetch
private def elect(): Unit = { // 检查集群中是否存在可用 controller (activeControllerId == -1) try { // 当前 broker 经过 KafkaZkClient 发起选举,并选举本身为新的 controller val (epoch, epochZkVersion) = zkClient.registerControllerAndIncrementControllerEpoch(config.brokerId) // 若是 broker 当选,则更新对应的 controller 相关信息 controllerContext.epoch = epoch controllerContext.epochZkVersion = epochZkVersion activeControllerId = config.brokerId info(s"${config.brokerId} successfully elected as the controller. Epoch incremented to ${controllerContext.epoch} and epoch zk version is now ${controllerContext.epochZkVersion}") onControllerFailover() // 选举成功后触发维护操做 } catch { case e: ControllerMovedException => maybeResign() if (activeControllerId != -1) // 其余 broker 被选为 controller debug(s"Broker $activeControllerId was elected as controller instead of broker ${config.brokerId}", e) else // 本轮选举没有产生 controller warn("A controller has been elected but just resigned, this will result in another round of election", e) case t: Throwable => error(s"Error while electing or becoming controller on broker ${config.brokerId}. Trigger controller movement immediately", t) triggerControllerMove() } }
KafkaZkClient 中更新 Zookeeper 的逻辑以下:this
def registerControllerAndIncrementControllerEpoch(controllerId: Int): (Int, Int) = { val timestamp = time.milliseconds() // 从 /controller_epoch 获取当前 controller 对应的 epoch 与 zkVersion // 若 /controller_epoch 不存在则尝试建立 val (curEpoch, curEpochZkVersion) = getControllerEpoch .map(e => (e._1, e._2.getVersion)) .getOrElse(maybeCreateControllerEpochZNode()) // 建立 /controller 并原子性更新 /controller_epoch val newControllerEpoch = curEpoch + 1 val expectedControllerEpochZkVersion = curEpochZkVersion debug(s"Try to create ${ControllerZNode.path} and increment controller epoch to $newControllerEpoch with expected controller epoch zkVersion $expectedControllerEpochZkVersion") // 处理 /controller 节点已存在的状况,直接返回最新节点信息 def checkControllerAndEpoch(): (Int, Int) = { val curControllerId = getControllerId.getOrElse(throw new ControllerMovedException( s"The ephemeral node at ${ControllerZNode.path} went away while checking whether the controller election succeeds. " + s"Aborting controller startup procedure")) if (controllerId == curControllerId) { val (epoch, stat) = getControllerEpoch.getOrElse( throw new IllegalStateException(s"${ControllerEpochZNode.path} existed before but goes away while trying to read it")) // 若是最新的 epoch 与 newControllerEpoch 相等,则能够推断 zkVersion 与当前 broker 已知的 zkVersion 一致 if (epoch == newControllerEpoch) return (newControllerEpoch, stat.getVersion) } throw new ControllerMovedException("Controller moved to another broker. Aborting controller startup procedure") } // 封装 zookeeper 请求 def tryCreateControllerZNodeAndIncrementEpoch(): (Int, Int) = { val response = retryRequestUntilConnected( MultiRequest(Seq( // 发送 CreateRequest 建立 /controller 临时节点 CreateOp(ControllerZNode.path, ControllerZNode.encode(controllerId, timestamp), defaultAcls(ControllerZNode.path), CreateMode.EPHEMERAL), // 发送 SetDataRequest 更新 /controller_epoch 节点信息 SetDataOp(ControllerEpochZNode.path, ControllerEpochZNode.encode(newControllerEpoch), expectedControllerEpochZkVersion))) ) response.resultCode match { case Code.NODEEXISTS | Code.BADVERSION => checkControllerAndEpoch() case Code.OK => val setDataResult = response.zkOpResults(1).rawOpResult.asInstanceOf[SetDataResult] (newControllerEpoch, setDataResult.getStat.getVersion) case code => throw KeeperException.create(code) } } // 向 zookeepr 发起请求 tryCreateControllerZNodeAndIncrementEpoch() }
一个 Kafka 分区本质上就是一个备份日志,经过利用多份相同的冗余副本replica
保持系统高可用性。
Kafka 把分区的全部副本均匀地分配到全部 broker上,并从这些副本中挑选一个做为 leader 副本对外提供服务。
而其余副本被称为 follower 副本,不对外提供服务,只能被动地向 leader 副本请求数据,保持与 leader 副本的同步。spa
当 controller 发现一个 broker 加入集群时,它会使用 broker.id 来检查新加入的 broker 是否包含现有分区的副本。
若是有,controller 就把变动通知发送全部 broker,新 broker 中的分区做为 follower 副本开始从 leader 那里复制消息。
Kafka 为每一个主题维护了一组同步副本集合in-sync replicas
(其中包含 leader 副本)。
只有被 ISR 中的全部副本都接收到的那部分生产者写入的消息才对消费者可见,这意味着 ISR 中的全部副本都会与 leader 保持同步状态。
为了不出现新 leader 数据不完整致使分区数据丢失的状况,只有 ISR 中 follower 副本才有资格被选举为 leader。
若 follower 副本没法在 replica.lag.time.max.ms
毫秒内向 leader 请求数据,那么该 follower 就会被视为不一样步,leader 会将其剔除出 ISR。
leader 会在 ISR 集合发生变动时,会在/isr_change_notification
下建立一个永久节点并写入变动信息。
当监控/isr_change_notification
的 controller 接收到通知后,会更新其余 broker 的元数据,最后删除已处理过的节点。
当出现瞬时峰值流量,只要 follower 不是持续性落后,就不会反复地在 ISR 中移进、移出,避免频繁访问 Zookeeper 影响性能。
建立主题时,Kafka 会为每一个分区选定一个初始分区 leaderpreferred leader
,其对应的副本被称为首选副本preferred replica
。
controller 在建立主题时会保证 leader 在 broker 之间均衡分布,所以当 leader 按照初始的首选副本分布时,broker 间的负载均衡状态最佳。
然而 broker 失效是难以免的,重启后的首选副本只能做为 follower 副本加入 ISR 中,不能再对外提供服务。
随着集群的不断运行,leader 不均衡现象会愈发明显:集群中的一小部分 broker 上承载了大量的分区 leader 副本。
能够设置 auto.leader.rebalance.enable = true
解决这一问题:
leader.imbalance.per.broker.percentage
时会自动执行一次 leader 均衡操做。
当一个新的 broker 刚加入集群时,不会自动地分担己有 topic 的负载,它只会对后续新增的 topic 生效。
若是要让新增 broker 为己有的 topic 服务,用户必须手动地调整现有的 topic 的分区分布,将一部分分区搬移到新增 broker 上。这就是所谓的分区重分配reassignment
操做。
除了处理 broker 扩容致使的不均衡以外,再均衡还能用于处理 broker 存储负载不均衡的状况,在单个或多个 broker 之间的日志目录之间从新分配分区。 用于解决多个代理之间的存储负载不平衡。
触发分区 leader 选举的几种场景:
当上述几种状况发生时,controller 会遍历全部相关的主题分区并从为其指定新的 leader。
而后向全部包含相关主题分区的 broker 发送更新请求,其中包含了最新的 leader 与 follower 副本分配信息。
更新完毕后,新 leader 会开始处理来自生产者和消费者的请求,而follower 开始重新 leader 那里复制消息。
分区状态信息在对应的节点信息:
// 节点 /brokers/topics/{topic-name}/partitions/{partition-no}/state 保存分区最新状态信息的 object TopicPartitionStateZNode { def path(partition: TopicPartition) = s"${TopicPartitionZNode.path(partition)}/state" def encode(leaderIsrAndControllerEpoch: LeaderIsrAndControllerEpoch): Array[Byte] = { val leaderAndIsr = leaderIsrAndControllerEpoch.leaderAndIsr val controllerEpoch = leaderIsrAndControllerEpoch.controllerEpoch Json.encodeAsBytes(Map("version" -> 1, "leader" -> leaderAndIsr.leader, "leader_epoch" -> leaderAndIsr.leaderEpoch, "controller_epoch" -> controllerEpoch, "isr" -> leaderAndIsr.isr.asJava).asJava) } def decode(bytes: Array[Byte], stat: Stat): Option[LeaderIsrAndControllerEpoch] = { Json.parseBytes(bytes).map { js => val leaderIsrAndEpochInfo = js.asJsonObject val leader = leaderIsrAndEpochInfo("leader").to[Int] val epoch = leaderIsrAndEpochInfo("leader_epoch").to[Int] val isr = leaderIsrAndEpochInfo("isr").to[List[Int]] val controllerEpoch = leaderIsrAndEpochInfo("controller_epoch").to[Int] val zkPathVersion = stat.getVersion LeaderIsrAndControllerEpoch(LeaderAndIsr(leader, epoch, isr, zkPathVersion), controllerEpoch) } } }
PartitionStateMachine 管理分区选举的代码:
private def doElectLeaderForPartitions(partitions: Seq[TopicPartition], partitionLeaderElectionStrategy: PartitionLeaderElectionStrategy ): (Map[TopicPartition, Either[Exception, LeaderAndIsr]], Seq[TopicPartition]) = { // 请求 Zookeeper 获取 partition 当前状态 val getDataResponses = try { zkClient.getTopicPartitionStatesRaw(partitions) } catch { case e: Exception => return (partitions.iterator.map(_ -> Left(e)).toMap, Seq.empty) } val failedElections = mutable.Map.empty[TopicPartition, Either[Exception, LeaderAndIsr]] val validLeaderAndIsrs = mutable.Buffer.empty[(TopicPartition, LeaderAndIsr)] getDataResponses.foreach { getDataResponse => val partition = getDataResponse.ctx.get.asInstanceOf[TopicPartition] val currState = partitionState(partition) if (getDataResponse.resultCode == Code.OK) { // 剔除状态已失效或不存在的 partition TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat) match { case Some(leaderIsrAndControllerEpoch) => if (leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch) { val failMsg = s"Aborted leader election for partition $partition since the LeaderAndIsr path was " + s"already written by another controller. This probably means that the current controller $controllerId went through " + s"a soft failure and another controller was elected with epoch ${leaderIsrAndControllerEpoch.controllerEpoch}." failedElections.put(partition, Left(new StateChangeFailedException(failMsg))) } else { validLeaderAndIsrs += partition -> leaderIsrAndControllerEpoch.leaderAndIsr } case None => val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state") failedElections.put(partition, Left(exception)) } } else if (getDataResponse.resultCode == Code.NONODE) { val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state") failedElections.put(partition, Left(exception)) } else { failedElections.put(partition, Left(getDataResponse.resultException.get)) } } // 若是所有 partition 均失效,则跳过这次选举 if (validLeaderAndIsrs.isEmpty) { return (failedElections.toMap, Seq.empty) } // 根据指定的选举策略选择 partition leader val (partitionsWithoutLeaders, partitionsWithLeaders) = partitionLeaderElectionStrategy match { // Elect leaders for new or offline partitions. case OfflinePartitionLeaderElectionStrategy(allowUnclean) => val partitionsWithUncleanLeaderElectionState = collectUncleanLeaderElectionState(validLeaderAndIsrs, allowUnclean) leaderForOffline(controllerContext, partitionsWithUncleanLeaderElectionState).partition(_.leaderAndIsr.isEmpty) // Elect leaders for partitions that are undergoing reassignment. case ReassignPartitionLeaderElectionStrategy => leaderForReassign(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty) // Elect preferred leaders. case PreferredReplicaPartitionLeaderElectionStrategy => leaderForPreferredReplica(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty) // Elect leaders for partitions whose current leaders are shutting down. case ControlledShutdownPartitionLeaderElectionStrategy => leaderForControlledShutdown(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty) } partitionsWithoutLeaders.foreach { electionResult => val partition = electionResult.topicPartition val failMsg = s"Failed to elect leader for partition $partition under strategy $partitionLeaderElectionStrategy" failedElections.put(partition, Left(new StateChangeFailedException(failMsg))) } // 将选举结果同步到 TopicPartitionStateZNode 对应的 Zookeeper 节点 val recipientsPerPartition = partitionsWithLeaders.map(result => result.topicPartition -> result.liveReplicas).toMap val adjustedLeaderAndIsrs = partitionsWithLeaders.map(result => result.topicPartition -> result.leaderAndIsr.get).toMap val UpdateLeaderAndIsrResult(finishedUpdates, updatesToRetry) = zkClient.updateLeaderAndIsr(adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion) finishedUpdates.forKeyValue { (partition, result) => result.foreach { leaderAndIsr => val replicaAssignment = controllerContext.partitionFullReplicaAssignment(partition) val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch) controllerContext.putPartitionLeadershipInfo(partition, leaderIsrAndControllerEpoch) controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipientsPerPartition(partition), partition, leaderIsrAndControllerEpoch, replicaAssignment, isNew = false) } } (finishedUpdates ++ failedElections, updatesToRetry) }
日志复制中的一些重要偏移概念:
base offset
:副本所含第一条消息的 offsethigh watermark
:副本最新一条己提交消息的 offsetlog end offset
:副本中下一条待写入消息的 offset
每一个副本会同时维护 HW 与 LEO 值:
Kafka 中的复制流程大体以下:
在前面咱们提到 follower 在重启后会对日志进行截断,这可能致使消息会丢失:
假设某个分区分布在 A 和 B 两个 broker 上,且最开始时 B 是分区 leader
为了解决这一问题,Kafka 为每一届 leader 分配了一个惟一的 epoch,由其追加到日志的消息都会包含这个 epoch。
而后每一个副本都在本地维护一个 epoch 快照文件,并在其中保存 (epoch, offset)
:
回到以前的场景,增长了 leader epoch 以后的行为以下:
LeaderEpochRequest
请求最新的 leader epoch更多的细节能够参考这篇文章。
建立主题时,Kafka 会为主题的每一个分区在文件系统中建立了一个对应的子目录,命名格式为主题名-分区号
,每一个日志子目录的文件构成以下:
[lhop@localhost log]$ tree my-topic-* my-topic-0 ├── 00000000000050209130.index ├── 00000000000050209130.log ├── 00000000000050209130.snapshot ├── 00000000000050209130.timeindex └── leader-epoch-checkpoint my-topic-1 ├── 00000000000048329826.index ├── 00000000000048329826.log ├── 00000000000048329826.timeindex └── leader-epoch-checkpoint
其中的 leader-epoch-checkpoint
文件用于存储 leader epoch 快照,用于协助崩溃的副本执行恢复操做,在此就不详细展开。咱们重点关注剩余的两类文件。
日志段文件(.log)的文件保存着真实的 Kafka 记录。
Kafka 使用该文件第一条记录对应的 offset 来命名此文件。
每一个日志段文件是有上限大小的,由 broker 端参数log.segment.bytes
控制。
除了键、值和偏移量外,消息里还包含了消息大小、校验和、消息格式版本号、压缩算法和时间戳。时间戳能够是生产者发送消息的时间,也能够是消息到达 broker 的时间,这个是可配置的。
若是生产者发送的是压缩过的消息,那么同一个批次的消息会被压缩在一块儿。broker 会原封不动的将消息存入磁盘,而后再把它发送给消费者。消费者在解压这个消息以后,会看到整个批次的消息,它们都有本身的时间戳和偏移量。
这意味着 broker 可使用zero-copy
技术给消费者发送消息,同时避免了对生产者已经压缩过的消息进行解压和再压缩。
位移索引文件(.index)与时间戳索引(.timeindex)是两个特殊的索引文件:
它们都属于稀疏索引文件,每写入若干条记录后才增长一个索引项。写入间隔能够 broker 端参数 log.index.interval.bytes
设置。
索引文件严格按照时间戳顺序保存,所以 Kafka 能够利用二分查找算法提升查找速度。