Controller 是从Kafka集群中选取一个的broker,负责管理topic分区和副本的状态的变化,以及执行重分配分区之类的管理任务。node
第一个启动的broker会成为一个controller,它会在Zookeeper上建立一个临时节点(ephemeral):/controller。其余后启动的broker也尝试去建立这样一个临时节点,但会报错,此时这些broker会在该zookeeper的/controller节点上建立一个监控(Watch),这样当该节点状态发生变化(好比:被删除)时,这些broker就会获得通知。此时,这些broker就能够在获得通知时,继续建立该节点。保证该集群一直都有一个controller节点。git
当controller所在的broker节点宕机或断开和Zookeeper的链接,它在Zookeeper上建立的临时节点就会被自动删除。其余在该节点上都安装了监控的broker节点都会获得通知,此时,这些broker都会尝试去建立这样一个临时的/controller节点,但它们当中只有一个broker(最早建立的那个)可以建立成功,其余的broker会报错:node already exists,接收到该错误的broker节点会再次在该临时节点上安装一个watch来监控该节点状态的变化。每次当一个broker被选举时,将会赋予一个更大的数字(经过zookeeper的条件递增实现),这样其余节点就知道controller目前的数字。github
当一个broker宕机而不在当前Kafka集群中时,controller将会获得通知(经过监控zookeeper的路径实现),如有些topic的主分区刚好在该broker上,此时controller将从新选择这些主分区。controller将会检查全部没有leader的分区,并决定新的leader是谁(简单的方法是:选择该分区的下一个副本分区),并给全部的broker发送请求。算法
每一个分区的新leader指导,它将接收来自客户端的生产者和消费者的请求。同时follower也指导,应该从这个新的leader开始复制消息。缓存
当一个新的broker节点加入集群时,controller将会检查,在该broker上是否存在分区副本。若存在,controller通知新的和存在的broker这个变化,该broker开始从leader处复制消息。网络
下面将从如下几个介绍controller的相关原理:函数
在KafkaServer.startup()中,KafkaController对象被构建,在启动KafkaApis、replicaManager后,KafkaController.startup()
被调用。spa
startup()
函数很是简单,这里直接粘代码:.net
除去日志以及标识状态的isRunning赋值,值得看的代码就两句。其中registerSessionExpirationListener()用于在zookeeper会话失效后重连时取消注册在zookeeper上的各类Listener,而controllerElector.startup则启动了选举,这些都将发生在ZookeeperLeaderElector类中。线程
Kafka集群中每一个Broker都会调用startup()函数,可是一个集群只有一个Broker可以成为Controller。那么,谁将成为这个controller呢?
KafkaController选举是直接经过zookeeper实现的,就是在zookeeper建立临时目录/controller/并在目录下存放当前brokerId。若是在zookeeper下建立路径没有抛出ZkNodeExistsException异常,则当前broker成功晋级为Controller。除了调用elect外,controllerElector.startup还会在/controller/路径上注册Listener,监听dataChange事件和dataDelete事件,当/controller/下数据发生变化时,表示Controller发生了变化;而由于/controller/下的数据为临时数据,当Controller发生failover时,数据会被删除,触发dataDelete事件,这时就须要从新选举新一任Controller。
成为KafkaController以后很重要的一件事,就是在zookeeper各个关键路径上添加Listener,因此这里颇有必要先总结一下跟controller相关的路径([ ]表示其中的值是随实际状况变化的):
名词解释
成为KafkaController之后,会执行什么操做呢?
完成对各个zookeeper路径的监听后,zookeeper内容的变化驱动Controller进行各类操做,处理如新建topic,删除topic,broker失效,broker恢复等事件。
前面startup()中registerSessionExpirationListener()会注册会话监听器,在zookeeper会话过时后又重连成功时调用onControllerResignation(),并从新执行选举操做。 此外,当Controller会话失效时,会删除/controller/路径下建立的临时数据。与此同时,其余broker上的ZookeeperLeaderElector类中的LeaderChangeListener感知到数据删除后会从新执行选举。
onControllerResignation()是Controller转变为普通broker时执行的操做,就是将前面注册的各个Listener取消注册,再也不关注zookeeper变化
在 KafkaController 中
l 有两个状态机:分区状态机和副本状态机;
l 一个管理器:Channel 管理器,负责管理全部的 Broker 通讯;
l 相关缓存:Partition 信息、Topic 信息、broker id 信息等;
l 四种 leader 选举机制:分别是用 leader offline、broker 掉线、partition reassign、最优 leader 选举时触发;
在 initializeControllerContext() 初始化 KafkaController 上下文信息的方法中,主要作了如下事情:
最优 leader 选举:就是默认选择 Replica 分配中第一个 replica 做为 leader,为何叫作最优 leader 选举呢?由于 Kafka 在给每一个 Partition 分配副本时,它会保证分区的主副本会均匀分布在全部的 broker 上,这样的话只要保证第一个 replica 被选举为 leader,读写流量就会均匀分布在全部的 Broker 上,固然这是有一个前提的,那就是每一个 Partition 的读写流量相差很少,可是在实际的生产环境,这是不太可能的,因此通常状况下,大集群是不建议开自动 leader 均衡的,能够经过额外的算法计算、手动去触发最优 leader 选举。
initializeControllerContext()
方法会经过 startChannelManager()
方法初始化 ControllerChannelManager 对象,以下所示:
ControllerChannelManager在初始化时,会为集群中的每一个节点初始化一个 ControllerBrokerStateInfo 对象,该对象包含四个部分:
其具体实现以下所示:
清楚了上面的逻辑,再来看 KafkaController 部分是如何向 Broker 发送请求的
KafkaController 其实是调用的 ControllerChannelManager 的 sendRequest()
方法向 Broker 发送请求信息,其实现以下所示:
它实际上只是把对应的请求添加到该 Broker 对应的 MessageQueue 中,并无真正的去发送请求,请求的的发送是在 每台 Broker 对应的 RequestSendThread 中处理的。
四种 leader 选举实现类及对应触发条件以下所示
实现 |
触发条件 |
OfflinePartitionLeaderSelector |
leader 掉线时触发 |
ReassignedPartitionLeaderSelector |
分区的副本从新分配数据同步完成后触发的 |
PreferredReplicaPartitionLeaderSelector |
最优 leader 选举,手动触发或自动 leader 均衡调度时触发 |
ControlledShutdownLeaderSelector |
broker 发送 ShutDown 请求主动关闭服务时触发 |
OfflinePartitionLeaderSelector
选举的逻辑是:
一旦 leader 被成功注册到 zk 中,它将会更新到 KafkaController 缓存中的 allLeaders 中。
ReassignedPartitionLeaderSelector
ReassignedPartitionLeaderSelector 是在 Partition 副本迁移后,副本同步完成(RAR 都处在 isr 中,RAR 指的是该 Partition 新分配的副本)后触发的,其 leader 选举逻辑以下:
PreferredReplicaPartitionLeaderSelector
PreferredReplicaPartitionLeaderSelector 是最优 leader 选举,选择 AR(assign replica)中的第一个副本做为 leader,前提是该 replica 在是存活的、而且在 isr 中,不然会抛出 StateChangeFailedException 的异常。
ControlledShutdownLeaderSelector
ControlledShutdownLeaderSelector 是在处理 broker 下线时调用的 leader 选举方法,它会选举 isr 中第一个没有正在关闭的 replica 做为 leader,不然抛出 StateChangeFailedException 异常。
http://www.javashuo.com/article/p-tzmaihsg-gk.html