这篇博客就和你们分享到这里,若是你们在研究学习的过程中有什么问题,能够加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!
另外,博主出书了《Hadoop大数据挖掘从入门到进阶实战》,喜欢的朋友或同窗, 能够在公告栏那里点击购买连接购买博主的书进行学习,在此感谢你们的支持。
平时在使用Kafka的时候,可能关注的更多的是Kafka系统层面的。今天来给你们剖析一下Kafka的控制器,了解一下Kafka控制器的选举流程。函数
Kafka控制器,其实就是一个Kafka系统的Broker。它除了具备通常Broker的功能以外,还具备选举主题分区Leader节点的功能。在启动Kafka系统时,其中一个Broker会被选举为控制器,负责管理主题分区和副本状态,还会执行分区从新分配的管理任务。oop
若是在Kafka系统运行过程当中,当前的控制器出现故障致使不可用,那么Kafka系统会从其余正常运行的Broker中从新选举出新的控制器。post
在Kafka集群中,每一个Broker在启动时会实例化一个KafkaController类。该类会执行一系列业务逻辑,选举出主题分区的Leader节点,步骤以下:学习
控制器建立的优先级是按照Kafka系统代理节点成功启动的顺序来建立的。用户能够经过改变Kafka系统代理节点的启动顺序,来查看控制器的建立优先级。以后,能够在Zookeeper系统中查看/controller临时节点的内容,例如:大数据
# 进入Zookeeper集群 [hadoop@dn1 bin]$ zkCli.sh -server dn1:2181 # 执行查看命令 [zk: dn1:2181(CONNECTED) 1] get /controller
成功执行命令后,能够看到代理节点0(即dn1节点)上成功建立了控制器,以下图所示:spa
当前启动顺序为:dn一、dn二、dn3,修改启动顺序为:dn三、dn一、dn2。再次查看Zookeeper系统中执行“get /controller”命令,输出结果以下图所示:代理
当控制器被关闭或者与Zookeeper系统断开链接时,Zookeeper系统上的临时节点就会被清除。Kafka集群中的监听器会接收到变动通知,各个代理节点会尝试到Zookeeper系统中建立一个控制器的临时节点。第一个成功在Zookeeper系统中建立的代理节点,将会成为新的控制器。每一个新选举出来的控制器,会在Zookeeper系统中获取一个递增的controller_epoch值。调试
选举控制器的核心思路是:各个代理节点公平竞争抢占Zookeeper系统中建立/controller临时节点,最早建立成功的代理节点会成为控制器,并拥有选举主题分区Leader节点的功能。选举流程以下图所示:code
当Kafka系统实例化KafkaController类时,主题分区Leader节点的选举流程便会开始。其中涉及的核心类包含KafkaController、ZookeeperLeaderElector、LeaderChangeListener、SessionExpirationListener。orm
Kafka系统的控制器主要负责管理主题、分区和副本。 Kafka系统在操做主题、分区和副本时,控制器会在Zookeeper系统的/brokers/topics节点,以及其子节点路径上注册一系列的监听器。 使用Kafka应用接口或者是Kafka系统脚本建立一个主题时,服务端会将建立后的结果返回给客户端。当客户端收到建立成功的提示时,其实服务端并无实际建立主题,而只是在Zookeeper系统的/brokers/topics节点中建立了该主题对应的子节点名称。
代理节点调用onBecomingLeader()函数实际上调用的是onControllerFailover()函数,因此在控制器调用onControllerFailover()函数时,会在初始化阶段分别建立分区状态机和副本状态机。代码以下所示:
def onControllerFailover() { if(isRunning) { info("Broker %d starting become controller state transition".format(config.brokerId)) readControllerEpochFromZookeeper() incrementControllerEpoch(zkUtils.zkClient) // 在/brokers/topics节点注册监听器 registerReassignedPartitionsListener() registerIsrChangeNotificationListener() registerPreferredReplicaElectionListener() partitionStateMachine.registerListeners() // 注册分区状态机 replicaStateMachine.registerListeners() // 注册副本状态机 initializeControllerContext() // 在控制器初始化以后,在状态机启动以前,须要发送更新元数据请求 sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) replicaStateMachine.startup() // 启动副本状态机 partitionStateMachine.startup() // 启动分区状态机 // 在自动故障转移中为全部主题注册分区更改监听器 controllerContext.allTopics.foreach(topic => partitionStateMachine. registerPartitionChangeListener(topic)) info("Broker %d is ready to serve as the new controller with epoch %d". format(config.brokerId, epoch)) maybeTriggerPartitionReassignment() maybeTriggerPreferredReplicaElection() if (config.autoLeaderRebalanceEnable) { info("starting the partition rebalance scheduler") autoRebalanceScheduler.startup() autoRebalanceScheduler.schedule("partition-rebalance-thread", checkAndTriggerPartitionRebalance, 5, config.leaderImbalanceCheckIntervalSeconds.toLong, TimeUnit.SECONDS) } deleteTopicManager.start() } else info("Controller has been shut down, aborting startup/failover") }
主题的分区状态机经过registerListeners()函数,在Zookeeper系统中的/brokers/topics节点上注册了TopicChangeListener和DeleteTopicListener两个监听器。建立一个主题时,主题信息、主题分区和副本会被写到Zookeeper系统的/brokers/topics节点中,这就会触发分区和副本状态机注册监听器。
Kafka系统总体来讲,调试还算方便。下载Kafka源代码,导入到IDE中,就能够启动整个Kafka系统了,能够经过DEBUG的方式来亲自了解控制器的执行流程。
这篇博客就和你们分享到这里,若是你们在研究学习的过程中有什么问题,能够加群进行讨论或发送邮件给我,我会尽我所能为您解答,与君共勉!
另外,博主出书了《Hadoop大数据挖掘从入门到进阶实战》,喜欢的朋友或同窗, 能够在公告栏那里点击购买连接购买博主的书进行学习,在此感谢你们的支持。