kafka控制层-controller启动与选举

KafkaServer启动

在startup方法里,会实例化KafkaControlleride

class KafkaServer {
  def startup() {
        .......
        kafkaController = new KafkaController(config, zkUtils, time, metrics, threadNamePrefix)
        kafkaController.startup()
        ......
    }
}

KafkaController重要属性

  • activeControllerId:获取controller的主broker的id
  • eventManager: ControllerEventManager实例,负责处理事件

ControllerEventManager

ControllerEventManager实例化一个线程,用来单独处理KafkaController发送的事件。函数

class ControllerEventManager(rateAndTimeMetrics: Map[ControllerState, KafkaTimer],
                             eventProcessedListener: ControllerEvent => Unit) {
   // eventProcessedListener是event的回调函数

  @volatile private var _state: ControllerState = ControllerState.Idle

  // event队列
  private val queue = new LinkedBlockingQueue[ControllerEvent]
  private val thread = new ControllerEventThread("controller-event-thread")

  def state: ControllerState = _state

  def start(): Unit = thread.start()

  def close(): Unit = thread.shutdown()
   // KafkaController调用put方法,添加事件
  def put(event: ControllerEvent): Unit = queue.put(event)

  class ControllerEventThread(name: String) extends ShutdownableThread(name = name) {
    override def doWork(): Unit = {
      // 从queue取出event
      val controllerEvent = queue.take()
      // 更新状态为event的状态
      _state = controllerEvent.state

      try {
        // event自身包含process方法,定义了如何处理event
        rateAndTimeMetrics(state).time {
          controllerEvent.process()
        }
      } catch {
        case e: Throwable => error(s"Error processing event $controllerEvent", e)
      }
      // 回调
      try eventProcessedListener(controllerEvent)
      catch {
        case e: Throwable => error(s"Error while invoking listener for processed event $controllerEvent", e)
      }
      // 更新状态为Idle
      _state = ControllerState.Idle
    }
  }

}

事件基类

state表示处理event时,controller的状态。fetch

process表示处理event的程序this

sealed trait ControllerEvent {
  def state: ControllerState
  def process(): Unit
}

ControllerState

ControllerState目前有10种线程

object ControllerState {

  case object Idle extends ControllerState {
    def value = 0
    override protected def hasRateAndTimeMetric: Boolean = false
  }

  case object ControllerChange extends ControllerState {
    def value = 1
  }

  case object BrokerChange extends ControllerState {
    def value = 2
    override def rateAndTimeMetricName = Some("LeaderElectionRateAndTimeMs")
  }

  case object TopicChange extends ControllerState {
    def value = 3
  }

  case object TopicDeletion extends ControllerState {
    def value = 4
  }

  case object PartitionReassignment extends ControllerState {
    def value = 5
  }

  case object AutoLeaderBalance extends ControllerState {
    def value = 6
  }

  case object ManualLeaderBalance extends ControllerState {
    def value = 7
  }

  case object ControlledShutdown extends ControllerState {
    def value = 8
  }

  case object IsrChange extends ControllerState {
    def value = 9
  }

Startup事件

KafkaController在启动的时候,会生成Startup事件,添加到EventManagescala

class KafkaController {
  def startup() = {
    eventManager.put(Startup)
    eventManager.start()
  }
}

Startup定义debug

case object Startup extends ControllerEvent {

    def state = ControllerState.ControllerChange

    override def process(): Unit = {
      // 注册zookeeper的链接事件
      registerSessionExpirationListener()
      // 注册"/controller"节点的数据变化事件(节点存储主controller的id,数据变化意味着主controller发送变化)
      registerControllerChangeListener()
      // 选举,尝试争取controller 
      elect()
    }

  private def registerSessionExpirationListener() = {
    zkUtils.zkClient.subscribeStateChanges(new SessionExpirationListener(this, eventManager))
  }

class SessionExpirationListener(controller: KafkaController, eventManager: ControllerEventManager) extends IZkStateListener with Logging {

  @throws[Exception]
  override def handleNewSession(): Unit = {
    // 当从新链接到zookeeper时,发送Reelect事件
    eventManager.put(controller.Reelect)
  }
}

  private def registerControllerChangeListener() = {
    zkUtils.zkClient.subscribeDataChanges(ZkUtils.ControllerPath, new ControllerChangeListener(this, eventManager))
  }

  }

#Reelect事件 Reelect代表竞选主controllercode

case object Reelect extends ControllerEvent {

    def state = ControllerState.ControllerChange

    override def process(): Unit = {
      val wasActiveBeforeChange = isActive
      // 获取主controller的id
      activeControllerId = getControllerID()
      // 先前是主controller,如今是从controller
      if (wasActiveBeforeChange && !isActive) {
        // 关闭先前zookeeper事件的监听
        onControllerResignation()
      }
      // 尝试竞选主controller
      elect()
    }
  }

  def elect(): Unit = {
    val timestamp = time.milliseconds
    // 节点数据,为brokerId + timestamp
    val electString = ZkUtils.controllerZkData(config.brokerId, timestamp)

    activeControllerId = getControllerID()
    // activeControllerId为-1,表示主controller尚未
    if (activeControllerId != -1) {
      debug("Broker %d has been elected as the controller, so stopping the election process.".format(activeControllerId))
      return
    }

    try {
      // 竞争建立临时节点。若是未成功,会抛出ZkNodeExistsException异常
      val zkCheckedEphemeral = new ZKCheckedEphemeral(ZkUtils.ControllerPath,
                                                      electString,
                                                      controllerContext.zkUtils.zkConnection.getZookeeper,
                                                      controllerContext.zkUtils.isSecure)
      zkCheckedEphemeral.create()
      info(config.brokerId + " successfully elected as the controller")
      activeControllerId = config.brokerId
      // 注册监听事件,这些都是主controller负责
      onControllerFailover()
    } catch {
      case _: ZkNodeExistsException =>
        activeControllerId = getControllerID
        if (activeControllerId != -1)
          debug("Broker %d was elected as controller instead of broker %d".format(activeControllerId, config.brokerId))
        else
          warn("A controller has been elected but just resigned, this will result in another round of election")

      case e2: Throwable =>
        error("Error while electing or becoming controller on broker %d".format(config.brokerId), e2)
        triggerControllerMove()
    }
  }

  def onControllerFailover() {
    info("Broker %d starting become controller state transition".format(config.brokerId))
    readControllerEpochFromZookeeper()
    incrementControllerEpoch()

    // 分区分配事件
    registerPartitionReassignmentListener()
    // 通知事件
    registerIsrChangeNotificationListener()
    // replica选举事件
    registerPreferredReplicaElectionListener()
    // topic变化事件
    registerTopicChangeListener()
    // topic删除事件
    registerTopicDeletionListener()
    // broker事件
    registerBrokerChangeListener()

    // 初始化context
    initializeControllerContext()
    
    val (topicsToBeDeleted, topicsIneligibleForDeletion) = fetchTopicDeletionsInProgress()
    topicDeletionManager.init(topicsToBeDeleted, topicsIneligibleForDeletion)

    sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
    // 启动副本状态机
    replicaStateMachine.startup()
    // 启动分片状态机
    partitionStateMachine.startup()

    // 监听partition改变事件
    controllerContext.allTopics.foreach(topic => registerPartitionModificationsListener(topic))
    info("Broker %d is ready to serve as the new controller with epoch %d".format(config.brokerId, epoch))
    maybeTriggerPartitionReassignment()
    topicDeletionManager.tryTopicDeletion()
    val pendingPreferredReplicaElections = fetchPendingPreferredReplicaElections()
    onPreferredReplicaElection(pendingPreferredReplicaElections)
    info("starting the controller scheduler")
    kafkaScheduler.startup()
    if (config.autoLeaderRebalanceEnable) {
      scheduleAutoLeaderRebalanceTask(delay = 5, unit = TimeUnit.SECONDS)
    }
  }

归纳

KafkaController负责集群中的一些事件,好比topic的新增和删除。orm

KafkaController在kafka集群中,扮演着很重要的角色。因此为了保证controller的高可用,集群的每一个节点,都会运行KafkaController服务,经过zookeeper的选举,保证任什么时候刻只有一个主的controller。队列

KafkaController在启动的时候,就会尝试竞争主controller。若是竞选成功,就会在zookeeper中注册监听事件。

相关文章
相关标签/搜索