在startup方法里,会实例化KafkaControlleride
class KafkaServer { def startup() { ....... kafkaController = new KafkaController(config, zkUtils, time, metrics, threadNamePrefix) kafkaController.startup() ...... } }
ControllerEventManager实例化一个线程,用来单独处理KafkaController发送的事件。函数
class ControllerEventManager(rateAndTimeMetrics: Map[ControllerState, KafkaTimer], eventProcessedListener: ControllerEvent => Unit) { // eventProcessedListener是event的回调函数 @volatile private var _state: ControllerState = ControllerState.Idle // event队列 private val queue = new LinkedBlockingQueue[ControllerEvent] private val thread = new ControllerEventThread("controller-event-thread") def state: ControllerState = _state def start(): Unit = thread.start() def close(): Unit = thread.shutdown() // KafkaController调用put方法,添加事件 def put(event: ControllerEvent): Unit = queue.put(event) class ControllerEventThread(name: String) extends ShutdownableThread(name = name) { override def doWork(): Unit = { // 从queue取出event val controllerEvent = queue.take() // 更新状态为event的状态 _state = controllerEvent.state try { // event自身包含process方法,定义了如何处理event rateAndTimeMetrics(state).time { controllerEvent.process() } } catch { case e: Throwable => error(s"Error processing event $controllerEvent", e) } // 回调 try eventProcessedListener(controllerEvent) catch { case e: Throwable => error(s"Error while invoking listener for processed event $controllerEvent", e) } // 更新状态为Idle _state = ControllerState.Idle } } }
state表示处理event时,controller的状态。fetch
process表示处理event的程序this
sealed trait ControllerEvent { def state: ControllerState def process(): Unit }
ControllerState目前有10种线程
object ControllerState { case object Idle extends ControllerState { def value = 0 override protected def hasRateAndTimeMetric: Boolean = false } case object ControllerChange extends ControllerState { def value = 1 } case object BrokerChange extends ControllerState { def value = 2 override def rateAndTimeMetricName = Some("LeaderElectionRateAndTimeMs") } case object TopicChange extends ControllerState { def value = 3 } case object TopicDeletion extends ControllerState { def value = 4 } case object PartitionReassignment extends ControllerState { def value = 5 } case object AutoLeaderBalance extends ControllerState { def value = 6 } case object ManualLeaderBalance extends ControllerState { def value = 7 } case object ControlledShutdown extends ControllerState { def value = 8 } case object IsrChange extends ControllerState { def value = 9 }
KafkaController在启动的时候,会生成Startup事件,添加到EventManagescala
class KafkaController { def startup() = { eventManager.put(Startup) eventManager.start() } }
Startup定义debug
case object Startup extends ControllerEvent { def state = ControllerState.ControllerChange override def process(): Unit = { // 注册zookeeper的链接事件 registerSessionExpirationListener() // 注册"/controller"节点的数据变化事件(节点存储主controller的id,数据变化意味着主controller发送变化) registerControllerChangeListener() // 选举,尝试争取controller elect() } private def registerSessionExpirationListener() = { zkUtils.zkClient.subscribeStateChanges(new SessionExpirationListener(this, eventManager)) } class SessionExpirationListener(controller: KafkaController, eventManager: ControllerEventManager) extends IZkStateListener with Logging { @throws[Exception] override def handleNewSession(): Unit = { // 当从新链接到zookeeper时,发送Reelect事件 eventManager.put(controller.Reelect) } } private def registerControllerChangeListener() = { zkUtils.zkClient.subscribeDataChanges(ZkUtils.ControllerPath, new ControllerChangeListener(this, eventManager)) } }
#Reelect事件 Reelect代表竞选主controllercode
case object Reelect extends ControllerEvent { def state = ControllerState.ControllerChange override def process(): Unit = { val wasActiveBeforeChange = isActive // 获取主controller的id activeControllerId = getControllerID() // 先前是主controller,如今是从controller if (wasActiveBeforeChange && !isActive) { // 关闭先前zookeeper事件的监听 onControllerResignation() } // 尝试竞选主controller elect() } } def elect(): Unit = { val timestamp = time.milliseconds // 节点数据,为brokerId + timestamp val electString = ZkUtils.controllerZkData(config.brokerId, timestamp) activeControllerId = getControllerID() // activeControllerId为-1,表示主controller尚未 if (activeControllerId != -1) { debug("Broker %d has been elected as the controller, so stopping the election process.".format(activeControllerId)) return } try { // 竞争建立临时节点。若是未成功,会抛出ZkNodeExistsException异常 val zkCheckedEphemeral = new ZKCheckedEphemeral(ZkUtils.ControllerPath, electString, controllerContext.zkUtils.zkConnection.getZookeeper, controllerContext.zkUtils.isSecure) zkCheckedEphemeral.create() info(config.brokerId + " successfully elected as the controller") activeControllerId = config.brokerId // 注册监听事件,这些都是主controller负责 onControllerFailover() } catch { case _: ZkNodeExistsException => activeControllerId = getControllerID if (activeControllerId != -1) debug("Broker %d was elected as controller instead of broker %d".format(activeControllerId, config.brokerId)) else warn("A controller has been elected but just resigned, this will result in another round of election") case e2: Throwable => error("Error while electing or becoming controller on broker %d".format(config.brokerId), e2) triggerControllerMove() } } def onControllerFailover() { info("Broker %d starting become controller state transition".format(config.brokerId)) readControllerEpochFromZookeeper() incrementControllerEpoch() // 分区分配事件 registerPartitionReassignmentListener() // 通知事件 registerIsrChangeNotificationListener() // replica选举事件 registerPreferredReplicaElectionListener() // topic变化事件 registerTopicChangeListener() // topic删除事件 registerTopicDeletionListener() // broker事件 registerBrokerChangeListener() // 初始化context initializeControllerContext() val (topicsToBeDeleted, topicsIneligibleForDeletion) = fetchTopicDeletionsInProgress() topicDeletionManager.init(topicsToBeDeleted, topicsIneligibleForDeletion) sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq) // 启动副本状态机 replicaStateMachine.startup() // 启动分片状态机 partitionStateMachine.startup() // 监听partition改变事件 controllerContext.allTopics.foreach(topic => registerPartitionModificationsListener(topic)) info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch)) maybeTriggerPartitionReassignment() topicDeletionManager.tryTopicDeletion() val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections() onPreferredReplicaElection(pendingPreferredReplicaElections) info("starting the controller scheduler") kafkaScheduler.startup() if (config.autoLeaderRebalanceEnable) { scheduleAutoLeaderRebalanceTask(delay = 5, unit = TimeUnit.SECONDS) } }
KafkaController负责集群中的一些事件,好比topic的新增和删除。orm
KafkaController在kafka集群中,扮演着很重要的角色。因此为了保证controller的高可用,集群的每一个节点,都会运行KafkaController服务,经过zookeeper的选举,保证任什么时候刻只有一个主的controller。队列
KafkaController在启动的时候,就会尝试竞争主controller。若是竞选成功,就会在zookeeper中注册监听事件。