MetadataCache更新

MetadataCache何时更新

updateCache方法用来更新缓存的。api

发起线程 controller-event-thread

controller选举的时候

CLASS_NAME METHOD_NAME LINE_NUM
kafka/controller/KafkaController sendUpdateMetadataRequest 1043
kafka/controller/KafkaController onControllerFailover 288
kafka/controller/KafkaController elect 1658
kafka/controller/KafkaController$Startup$ process 1581
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply$mcV$sp 53
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply 53
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply 53
kafka/metrics/KafkaTimer time 32
kafka/controller/ControllerEventManager$ControllerEventThread doWork 64
kafka/utils/ShutdownableThread run 70

启动的时候选举,启动这个动做也是个事件缓存

// KafkaController.scala
  case object Startup extends ControllerEvent {

    def state = ControllerState.ControllerChange

    override def process(): Unit = {
      registerSessionExpirationListener()
      registerControllerChangeListener()
      elect()
    }

  }

broker启动的时候

CLASS_NAME METHOD_NAME LINE_NUM
kafka/controller/KafkaController sendUpdateMetadataRequest 1043
kafka/controller/KafkaController onBrokerStartup 387
kafka/controller/KafkaController$BrokerChange process 1208
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply$mcV$sp 53
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply 53
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply 53
kafka/metrics/KafkaTimer time 32
kafka/controller/ControllerEventManager$ControllerEventThread doWork 64
kafka/utils/ShutdownableThread run 70

topic删除的时候

CLASS_NAME METHOD_NAME LINE_NUM
kafka/controller/KafkaController sendUpdateMetadataRequest 1043
kafka/controller/TopicDeletionManager kafka$controller$TopicDeletionManager$$onTopicDeletion 268
kafka/controller/TopicDeletionManager$$anonfun$resumeDeletions$2 apply 333
kafka/controller/TopicDeletionManager$$anonfun$resumeDeletions$2 apply 333
scala/collection/immutable/Set$Set1 foreach 94
kafka/controller/TopicDeletionManager resumeDeletions 333
kafka/controller/TopicDeletionManager enqueueTopicsForDeletion 110
kafka/controller/KafkaController$TopicDeletion process 1280
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply$mcV$sp 53
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply 53
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply 53
kafka/metrics/KafkaTimer time 32
kafka/controller/ControllerEventManager$ControllerEventThread doWork 64
kafka/utils/ShutdownableThread run 70

topic建立或者修改的时候

CLASS_NAME METHOD_NAME LINE_NUM
kafka/controller/ControllerBrokerRequestBatch updateMetadataRequestBrokerSet 291
kafka/controller/ControllerBrokerRequestBatch newBatch 294
kafka/controller/PartitionStateMachine handleStateChanges 105
kafka/controller/KafkaController onNewPartitionCreation 499
kafka/controller/KafkaController onNewTopicCreation 485
kafka/controller/KafkaController$TopicChange process 1237
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply$mcV$sp 53
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply 53
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply 53
kafka/metrics/KafkaTimer time 32
kafka/controller/ControllerEventManager$ControllerEventThread doWork 64
kafka/utils/ShutdownableThread run 70

topic建立这个是从队列中拿到事件再处理的方式
队列是kafka.controller.ControllerEventManager.queue
放入过程以下,本质仍是监听zk的path的child的变化:
安全





CLASS_NAME METHOD_NAME LINE_NUM
kafka/controller/ControllerEventManager put 44
kafka/controller/TopicChangeListener handleChildChange 1712
org/I0Itec/zkclient/ZkClient$10 run 848
org/I0Itec/zkclient/ZkEventThread run 85

注册监听器的代码以下:app

// class KafkaController
  private def registerTopicChangeListener() = {
    zkUtils.subscribeChildChanges(BrokerTopicsPath, topicChangeListener)
  }

顺带说一下有6个地方订阅了zk的子节点的变化:ide

  • DynamicConfigManager.startup
  • registerTopicChangeListener
  • registerIsrChangeNotificationListener
  • registerTopicDeletionListener
  • registerBrokerChangeListener
  • registerLogDirEventNotificationListener

处理建立topic事件:ui

// ControllerChannelManager.scala  class ControllerBrokerRequestBatch
  def sendRequestsToBrokers(controllerEpoch: Int) {
  // .......
      val updateMetadataRequest = {
        val liveBrokers = if (updateMetadataRequestVersion == 0) {
          // .......
        } else {
          controllerContext.liveOrShuttingDownBrokers.map { broker =>
            val endPoints = broker.endPoints.map { endPoint =>
              new UpdateMetadataRequest.EndPoint(endPoint.host, endPoint.port, endPoint.securityProtocol, endPoint.listenerName)
            }
            new UpdateMetadataRequest.Broker(broker.id, endPoints.asJava, broker.rack.orNull)
          }
        }
        new UpdateMetadataRequest.Builder(updateMetadataRequestVersion, controllerId, controllerEpoch, partitionStates.asJava,
          liveBrokers.asJava)
      }
      updateMetadataRequestBrokerSet.foreach { broker =>
        controller.sendRequest(broker, ApiKeys.UPDATE_METADATA, updateMetadataRequest, null)
      }
      // .......
    }

topic建立时更新metadata再进一步的过程
构建发送请求事件放入发送队列等待发送线程发送
构建发送请求事件代码以下:this

// ControllerChannelManager
  def sendRequest(brokerId: Int, apiKey: ApiKeys, request: AbstractRequest.Builder[_ <: AbstractRequest],
                  callback: AbstractResponse => Unit = null) {
    brokerLock synchronized {
      val stateInfoOpt = brokerStateInfo.get(brokerId)
      stateInfoOpt match {
        case Some(stateInfo) =>
          stateInfo.messageQueue.put(QueueItem(apiKey, request, callback))
        case None =>
          warn("Not sending request %s to broker %d, since it is offline.".format(request, brokerId))
      }
    }
  }

调用栈:
线程























CLASS_NAME METHOD_NAME LINE_NUM
kafka/controller/ControllerChannelManager sendRequest 81
kafka/controller/KafkaController sendRequest 662
kafka/controller/ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2 apply 405
kafka/controller/ControllerBrokerRequestBatch$$anonfun$sendRequestsToBrokers$2 apply 405
scala/collection/mutable/HashMap$$anonfun$foreach$1 apply 130
scala/collection/mutable/HashMap$$anonfun$foreach$1 apply 130
scala/collection/mutable/HashTable$class foreachEntry 241
scala/collection/mutable/HashMap foreachEntry 40
scala/collection/mutable/HashMap foreach 130
kafka/controller/ControllerBrokerRequestBatch sendRequestsToBrokers 502
kafka/controller/PartitionStateMachine handleStateChanges 105
kafka/controller/KafkaController onNewPartitionCreation 499
kafka/controller/KafkaController onNewTopicCreation 485
kafka/controller/KafkaController$TopicChange process 1237
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply$mcV$sp 53
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply 53
kafka/controller/ControllerEventManager$ControllerEventThread$$anonfun$doWork$1 apply 53
kafka/metrics/KafkaTimer time 32
kafka/controller/ControllerEventManager$ControllerEventThread doWork 64
kafka/utils/ShutdownableThread run 70

发送线程发送请求:
代码以下:scala

// ControllerChannelManager.scala class RequestSendThread
  override def doWork(): Unit = {

    def backoff(): Unit = CoreUtils.swallowTrace(Thread.sleep(100))

    val QueueItem(apiKey, requestBuilder, callback) = queue.take()
    //...
    while (isRunning.get() && !isSendSuccessful) {
        // if a broker goes down for a long time, then at some point the controller's zookeeper listener will trigger a
        // removeBroker which will invoke shutdown() on this thread. At that point, we will stop retrying.
        try {
          if (!brokerReady()) {
            isSendSuccessful = false
            backoff()
          }
          else {
            val clientRequest = networkClient.newClientRequest(brokerNode.idString, requestBuilder,
              time.milliseconds(), true)
            clientResponse = NetworkClientUtils.sendAndReceive(networkClient, clientRequest, time)
            isSendSuccessful = true
          }
        } catch {
          case e: Throwable => // if the send was not successful, reconnect to broker and resend the message
            warn(("Controller %d epoch %d fails to send request %s to broker %s. " +
              "Reconnecting to broker.").format(controllerId, controllerContext.epoch,
                requestBuilder.toString, brokerNode.toString), e)
            networkClient.close(brokerNode.idString)
            isSendSuccessful = false
            backoff()
        }
      }
      // ......
  }

响应线程

CLASS_NAME METHOD_NAME LINE_NUM
kafka/server/MetadataCache kafka$server$MetadataCache$$addOrUpdatePartitionInfo 150
kafka/utils/CoreUtils$ inLock 219
kafka/utils/CoreUtils$ inWriteLock 225
kafka/server/MetadataCache updateCache 184
kafka/server/ReplicaManager maybeUpdateMetadataCache 988
kafka/server/KafkaApis handleUpdateMetadataRequest 212
kafka/server/KafkaApis handle 142
kafka/server/KafkaRequestHandler run 72

线程信息: kafka-request-handler-5
partitionMetadataLock读写锁控制cache数据的读取与写入的线程安全。元数据信息在发送请求中已经构造好了。此处还涉live broker的更新等。code

应该还要补充:leader切换和isr变化等

相关文章
相关标签/搜索