Zookeeper Listener分析

KafkaController会通过zookeeper监控整个Kafka集群的运行状态,响应管理员的指令,通过在指定znode添加listener, 监听该znode的数据或者子节点等变化


一 TopicChangeListener

def handleChildChange(parentPath : String, children : java.util.List[String]) {
  inLock(controllerContext.controllerLock) {
    // 判断状态机是否启动     if (hasStarted.get) {
      try {
        // 获取'/brokers/topics' znode下的子节点集合         val currentChildren = {
          import JavaConversions._
          debug("Topic change listener fired for path %s with children %s".format(parentPath, children.mkString(",")))
          (children: Buffer[String]).toSet         }
        // 获取新加入的topic(过滤ControllerContext的所有topic,然后剩余的就是新的),         val newTopics = currentChildren -- controllerContext.allTopics         // 获取删除的topics(因为只要删除了,currentChildren肯定是没有这个topic,但是缓存里有,所以可以获取到)         val deletedTopics = controllerContext.allTopics -- currentChildren         // 将当前topic更新到controllerContext.allTopics         controllerContext.allTopics = currentChildren         // 获取新的topic的所有分区信息以及分区的AR副本集         val addedPartitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(newTopics.toSeq)
        // ControllerContextAR副本集过滤掉deletedTopics队列里的topic         controllerContext.partitionReplicaAssignment = controllerContext.partitionReplicaAssignment.filter(p =>
          !deletedTopics.contains(p._1.topic))
        // 并将新的topics更新到controllerContextAR副本集         controllerContext.partitionReplicaAssignment.++=(addedPartitionReplicaAssignment)
        info("New topics: [%s], deleted topics: [%s], new partition replica assignment [%s]".format(newTopics,
          deletedTopics, addedPartitionReplicaAssignment))
        // 新的topics不为空,处理新增topic         if(newTopics.nonEmpty)
          controller.onNewTopicCreation(newTopics, addedPartitionReplicaAssignment.keySet.toSet)
      } catch {
        case e: Throwable => error("Error while handling new topic", e )
      }
    }
  }
}

 

def onNewTopicCreation(topics: Set[String], newPartitions: Set[TopicAndPartition]) {
  info("New topic creation callback for %s".format(newPartitions.mkString(",")))
  // 为每一个新增的topic注册PartitionModificationChangeListener   topics.foreach(topic => partitionStateMachine.registerPartitionChangeListener(topic))
  onNewPartitionCreation(newPartitions)
}

 

 

def onNewPartitionCreation(newPartitions: Set[TopicAndPartition]) {
  info("New partition creation callback for %s".format(newPartitions.mkString(",")))
  // 将所有指定新增的分区的状态转换为NewPartition状态
 
partitionStateMachine
.handleStateChanges(newPartitions, NewPartition)
  // 将指定新增的分区的所有副本都转换为NewReplica状态
 
replicaStateMachine
.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica)
  // 将指定新增的分区转化为OnlinePartition状态
 
partitionStateMachine
.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)
  // 将指定新增的分区的所有副本都转换为OnlineReplica状态
 
replicaStateMachine
.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica)
}

二DeleteTopicListener

Topic 满足下列三种情况不能被删

# 如果topic任意一个分区正在重新分配副本

# 如果topic任意一个分区正在进行优先副本选举

# 如果topic的任意分区任意的一个副本所在broker宕机,则topic不能被删除

 

def handleChildChange(parentPath : String, children : java.util.List[String]) {
  inLock(controllerContext.controllerLock) {
    // 获取将被删除的topics集合
   
var topicsToBeDeleted = {
      import JavaConversions._
      (children: Buffer[String]).toSet
   
}
    debug("Delete topics listener fired for topics %s to be deleted".format(topicsToBeDeleted.mkString(",")))
    // 查看这些topics是在controllerContext.allTopics不存在的
   
val nonExistentTopics = topicsToBeDeleted-- controllerContext.allTopics
   
// 如果nonExistentTopics不为空,则删除它在zookeeper的路径
   
if(nonExistentTopics.nonEmpty) {
      warn("Ignoring request to delete non-existing topics " + nonExistentTopics.mkString(","))
      nonExistentTopics.foreach(topic => zkUtils.deletePathRecursive(getDeleteTopicPath(topic)))
    }
    // 过滤掉不存在的待删除topic
   
topicsToBeDeleted--= nonExistentTopics
   
if(topicsToBeDeleted.nonEmpty) {
      info("Starting topic deletion for topics " + topicsToBeDeleted.mkString(","))
      // 检查topic中是否处于不可删除状态
     
topicsToBeDeleted.foreach { topic =>
        // 检测topic是否有'优先副本选举
       
val preferredReplicaElectionInProgress =
          controllerContext.partitionsUndergoingPreferredReplicaElection.map(_.topic).contains(topic)
        // 检测是否有分区正在进行副本的重新分配
       
val partitionReassignmentInProgress =
          controllerContext.partitionsBeingReassigned.keySet.map(_.topic).contains(topic)
        // 如果有一个为true,则标记该topic删除不成功
       
if(preferredReplicaElectionInProgress || partitionReassignmentInProgress)
          controller.deleteTopicManager.markTopicIneligibleForDeletion(Set(topic))
      }
      // 将可用删除的topic添加要被删除的topics到删除队列
     
controller.deleteTopicManager.enqueueTopicsForDeletion(topicsToBeDeleted)
    }
  }
}

 

三PartitionModificationsListener

我们知道在topic的删除过程中,涉及到PartitionModificationsListener

的注册和取消。在新增topic时会为每一个topic注册一个PartitionModificationsListener,在成功删除topic之后会将注册Partition

ModifcationListener删除。PartitionModificationsListener会监听"/brokers/topics/[topic_name]"数据的变化,主要就是监听一个topic分区的变化

def handleDataChange(dataPath : String, data: Object) {
  inLock(controllerContext.controllerLock) {
    try {
      info(s"Partition modification triggered $data for path $dataPath")
      // zookeeper获取topic分区记录
     
val partitionReplicaAssignment = zkUtils.getReplicaAssignmentForTopics(List(topic))
      // 过滤出新增加的分区
     
val partitionsToBeAdded = partitionReplicaAssignment.filter(p =>
        !controllerContext.partitionReplicaAssignment.contains(p._1))
      if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(topic))
        error("Skipping adding partitions %s for topic %s since it is currentlybeing deleted"
             
.format(partitionsToBeAdded.map(_._1.partition).mkString(","), topic))
      else {
        // 如果新增分区
       
if (partitionsToBeAdded.nonEmpty) {
          info("New partitions to be added %s".format(partitionsToBeAdded))
          // 更新新增的分区到ControllerContext
         
controllerContext
.partitionReplicaAssignment.++=(partitionsToBeAdded)
          // 切换新增分区及其副本状态,最终使其上线对外提供服务
         
controller.onNewPartitionCreation(partitionsToBeAdded.keySet.toSet)
        }
      }
    } catch {
      case e: Throwable => error("Error while handling add partitions for data path " + dataPath, e )
    }
  }
}

 

def onNewPartitionCreation(newPartitions: Set[TopicAndPartition]) {
  info("New partition creation callback for %s".format(newPartitions.mkString(",")))
  // 将所有指定新增的分区的状态转换为NewPartition状态
 
partitionStateMachine
.handleStateChanges(newPartitions, NewPartition)
  // 将指定新增的分区的所有副本都转换为NewReplica状态
 
replicaStateMachine
.handleStateChanges(controllerContext.replicasForPartition(newPartitions), NewReplica)
  // 将指定新增的分区转化为OnlinePartition状态
 
partitionStateMachine
.handleStateChanges(newPartitions, OnlinePartition, offlinePartitionSelector)
  // 将指定新增的分区的所有副本都转换为OnlineReplica状态
 
replicaStateMachine
.handleStateChanges(controllerContext.replicasForPartition(newPartitions), OnlineReplica)
}

 

四 BrokerChangeListener

会监听"/brokers/ids"节点下子节点变化,主要负责处理broker的上线和故障下线

def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
  info("Broker change listener fired for path %s with children %s".format(parentPath, currentBrokerList.sorted.mkString(",")))
  inLock(controllerContext.controllerLock) {
    if (hasStarted.get) {
      ControllerStats.leaderElectionTimer.time {
        try {
          // zookeeper获取broker列表
         
val curBrokers = currentBrokerList.map(_.toInt).toSet.flatMap(zkUtils.getBrokerInfo)
          // 获取broker id列表
         
val curBrokerIds = curBrokers.map(_.id)
          val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
         
// 过滤出新的brokerId
         
val newBrokerIds = curBrokerIds -- liveOrShuttingDownBrokerIds
         
// 过滤出挂掉的broker id
         
val deadBrokerIds = liveOrShuttingDownBrokerIds -- curBrokerIds
          
val newBrokers = curBrokers.filter(broker => newBrokerIds(broker.id))
          controllerContext.liveBrokers = curBrokers
         
val newBrokerIdsSorted = newBrokerIds.toSeq.sorted
         
val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted
         
val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted
          info
("Newly added brokers: %s, deleted brokers: %s, all live brokers:%s"
           
.format(newBrokerIdsSorted.mkString(","), deadBrokerIdsSorted.mkString(","), liveBrokerIdsSorted.mkString(",")))
          // 创建controller与新增的broker的网络连接
         
newBrokers.foreach(controllerContext.controllerChannelManager.addBroker)
          // 关闭controller与故障broker的网络连接
         
deadBrokerIds.foreach(controllerContext.controllerChannelManager.removeBroker)
          // 处理新增broker的上线
         
if(newBrokerIds.nonEmpty)
            controller.onBrokerStartup(newBrokerIdsSorted)
          // 处理新增broker的下线
         
if(deadBrokerIds.nonEmpty)
            controller.onBrokerFailure(deadBrokerIdsSorted)
        } catch {
          case e: Throwable => error("Error while handling broker changes", e)
        }
      }
    }
  }
}

 

def onBrokerStartup(newBrokers: Seq[Int]) {
  info("New broker startup callback for %s".format(newBrokers.mkString(",")))
  val newBrokersSet = newBrokers.toSet
 
// 发送sendUpdateMetadataRequest请求
 
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
  // 获取在新的broker上的所有副本
 
val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet)
  // 将新增broker的副本状态都转化为OnlinePartition
 
replicaStateMachine
.handleStateChanges(allReplicasOnNewBrokers, OnlineReplica)
  partitionStateMachine.triggerOnlinePartitionStateChange()
  // 检测是否需要进行副本的重新分配
 
val partitionsWithReplicasOnNewBrokers = controllerContext.partitionsBeingReassigned.filter {
    case (topicAndPartition, reassignmentContext) => reassignmentContext.newReplicas.exists(newBrokersSet.contains(_))
  }
  partitionsWithReplicasOnNewBrokers.foreach(p => onPartitionReassignment(p._1, p._2))
  // 如果新增broker上有待删除的topic副本,则唤醒DeleteTopicsThread线程,进行删除
 
val replicasForTopicsToBeDeleted = allReplicasOnNewBrokers.filter(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
  if(replicasForTopicsToBeDeleted.nonEmpty) {
    info(("Some replicas %s for topics scheduled for deletion %s are on thenewly restarted brokers %s. " +
      "Signalingrestart of topic deletion for these topics").format(replicasForTopicsToBeDeleted.mkString(","),
      deleteTopicManager.topicsToBeDeleted.mkString(","), newBrokers.mkString(",")))
    deleteTopicManager.resumeDeletionForTopics(replicasForTopicsToBeDeleted.map(_.topic))
  }
}

 

def onBrokerFailure(deadBrokers: Seq[Int]) {
  info("Broker failure callback for %s".format(deadBrokers.mkString(",")))
  // 将正常关闭的brokerdead broker移除
 
val deadBrokersThatWereShuttingDown =
    deadBrokers.filter(id => controllerContext.shuttingDownBrokerIds.remove(id))
  info("Removed %s from list of shutting down brokers.".format(deadBrokersThatWereShuttingDown))
  val deadBrokersSet = deadBrokers.toSet
 
// 过滤得到leader副本在故障broker上的分区,并将其分区转化为offline partition状态
 
val partitionsWithoutLeader = controllerContext.partitionLeadershipInfo.filter(partitionAndLeader =>
    deadBrokersSet.contains(partitionAndLeader._2.leaderAndIsr.leader) &&
      !deleteTopicManager.isTopicQueuedUpForDeletion(partitionAndLeader._1.topic)).keySet
 
partitionStateMachine.handleStateChanges(partitionsWithoutLeader, OfflinePartition)
  // Offline Partition状态的分区转化为OnlinePartition状态
 
partitionStateMachine
.triggerOnlinePartitionStateChange()
  // 过滤得到在故障broker上的副本,将这些副本转化为OfflineReplica
 
var allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokersSet)
  val activeReplicasOnDeadBrokers = allReplicasOnDeadBrokers.filterNot(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
  // handle deadreplicas
 
replicaStateMachine
.handleStateChanges(activeReplicasOnDeadBrokers, OfflineReplica)
  // 检查故障broker上是否持有待删除的topic的副本,如果存在,则将其转化为ReplicaDeletionIneligible状态,并标记topic不可删除
 
val replicasForTopicsToBeDeleted = allReplicasOnDeadBrokers.filter(p => deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
  if(replicasForTopicsToBeDeleted.nonEmpty) {
    deleteTopicManager.failReplicaDeletion(replicasForTopicsToBeDeleted)
  }
  // 发送UpdateMetadataCacheRequest更新所有可用broker信息
 
if (partitionsWithoutLeader.isEmpty) {
    sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq)
  }
}

 

五IsrChangeNotificationListener

我们知道follower副本会与leader副本进行消息同步,当follower副本追上leader副本就会被添加到ISR列表;当follower副本与leader副本差距太大时会被剔除ISR集合。

Leader副本不仅会在ISR集合变化时将其记录到zookeeper,还会调用ReplicaManager.recordIsrChange方法记录到isrChangeSet集合,之后通过isr-change-propogation定时任务将该集合中的数据周期性写入zookeeper的"/isr-change-notification"路径下

IsrChangeNotificationListener就是用于监听此路径下子节点的变化:

override def handleChildChange(parentPath: String, currentChildren: util.List[String]): Unit = {
  import scala.collection.JavaConverters._
  inLock(controller.controllerContext.controllerLock) {
    debug("[IsrChangeNotificationListener] Fired!!!")
    // 获取'/isr-change-notification';路径下的子节点
   
val childrenAsScala: mutable.Buffer[String] = currentChildren.asScala
   
try {
      // 获取分区信息
     
val topicAndPartitions: immutable.Set[TopicAndPartition] = childrenAsScala.flatMap(x => getTopicAndPartition(x)).toSet
     
if (topicAndPartitions.nonEmpty) {
        // 更新ControllerContextpartitionLeadershipInfo(分区的leader信息)
       
controller.updateLeaderAndIsrCache(topicAndPartitions)
        // 然后向那些可用的broker发送UpdateMetadataCacheRequest请求
       
processUpdateNotifications(topicAndPartitions)
      }
    } finally {
      // 删除'isr-change-notification/partitions'下已经处理的信息
     
childrenAsScala.map(x => controller.controllerContext.zkUtils.deletePath(
        ZkUtils.IsrChangeNotificationPath + "/" + x))
    }
  }
}

 

 

六PreferredReplicaElectionListener

负责监听"/admin/preferred_replica_election"节点,当我们通过优先副本选举的命令指定某些分区需要进行优先副本选举时会将指定的分区信息写入该节点,从而触发PreferredReplicaElectionListener进行处理。优先副本选举目的是让分区的优先副本重新成为leader副本,这是为了让Leader副本在整个集群中分布的更加均衡

def handleDataChange(dataPath: String, data: Object) {
  debug("Preferred replica election listener fired for path %s. Recordpartitions to undergo preferred replica election %s"
         
.format(dataPath, data.toString))
  inLock(controllerContext.controllerLock) {
    // 获取参加优先副本选举的分区
   
val partitionsForPreferredReplicaElection = PreferredReplicaLeaderElectionCommand.parsePreferredReplicaElectionData(data.toString)
    if(controllerContext.partitionsUndergoingPreferredReplicaElection.nonEmpty)
      info("These partitions are already undergoing preferred replica election:%s"
       
.format(controllerContext.partitionsUndergoingPreferredReplicaElection.mkString(",")))
    // 过滤正在进行优先副本选举的分区
   
val partitions = partitionsForPreferredReplicaElection-- controllerContext.partitionsUndergoingPreferredReplicaElection
   
// 过滤掉正在进行删除状态的topic的分区
   
val partitionsForTopicsToBeDeleted = partitions.filter(p => controller.deleteTopicManager.isTopicQueuedUpForDeletion(p.topic))
    if(partitionsForTopicsToBeDeleted.nonEmpty) {
      error("Skipping preferred replica election for partitions %s since therespective topics are being deleted"
       
.format(partitionsForTopicsToBeDeleted))
    }
    // 对剩余的分区调用onPreferredReplicaElection方法进行优先副本的选举
   
controller.onPreferredReplicaElection(partitions -- partitionsForTopicsToBeDeleted)
  }
}

 

def onPreferredReplicaElection(partitions: Set[TopicAndPartition], isTriggeredByAutoRebalance: Boolean = false) {
  info("Starting preferred replica leader election for partitions %s".format(partitions.mkString(",")))
  try {
    // 将参加优先副本选举的分区添加到partitionsUndergoingPreferredReplicaElection
   
controllerContext
.partitionsUndergoingPreferredReplicaElection ++= partitions
   
// 将对应的topic标记为不可删除
   
deleteTopicManager
.markTopicIneligibleForDeletion(partitions.map(_.topic))
    // Partition转化成OnlinePartition,除了重新选举leader,还会更新zookeeper中的数据,并且
    //
发送LeaderAndIsrRequestUpdateMetadataRequest
   
partitionStateMachine
.handleStateChanges(partitions, OnlinePartition, preferredReplicaPartitionLeaderSelector)
  } catch {
    case e: Throwable => error("Error completing preferred replica leader election for partitions%s".format(partitions.mkString(",")), e)
  } finally {
    // 清理partitionsUndergoingPreferredReplicaElectionzookeeper相关的数据
   
removePartitionsFromPreferredReplicaElection(partitions, isTriggeredByAutoRebalance)
    deleteTopicManager.resumeDeletionForTopics(partitions.map(_.topic))
  }
}

 

七 副本重新分配相关的Listener

PartitionReassignedListener监听zookeeper路径"/admin/reassign_

partitions",当接收到ReassignPartitions命令指定某些分区需要重新分配副本,会将指定的分区的信息写入该节点,从而触发PartitionReassignedListener进行处理

下面是副本重新分配的步骤:

第一步:先从zookeeper的"/admin/reassign_partitions"读取分区重分配信息

第二步:过滤掉正在进行重新分配的分区

第三步:检测其topic是否为待删除的topic.如果是,则调用controller的removePartitionFromReassignedPartitions:

# 取消此分区注册的ReassignedPartitionsIsrChangeListener

# 删除zookeeper的"/admin/reassign_partitions"节点中与当前分区相关的数据

# 从partitionsBeingReassigned集合中删除分区相关数据

第四步:否则创建ReassignedPartitionsContext,调用initiateReassign

ReplicasForTopicPartition方法开始为重新分配副本做准备

# 获取旧的AR副本集合指定的新的副本集

# 比较新旧AR副本集,完全一样抛出异常

# 判断新AR副本集涉及的broker是否都可用,如不是则抛出异常

# 为分区添加注册ReassignedPartitionsIsrChangeListener

# 将分区添加到partitionsBeingReassigned集合中,并标志该topic不能被删除

# 调用onPartitionReassignment方法,开始执行副本重新分配

第五步: 判断AR集合中所有副本是否已经进入ISR集合,如果没有,进行如下步骤:

# 将分区在ContextController和zookeeper中AR集合更新成(新AR+旧AR)并集

# 向该AR并集发送LeaderAndIsrRequest,增加zookeeper中记录的leader_epoch值

# 将(新AR-旧AR)中差集副本更新成NewReplica,因为表示新增的副本,此步骤会向这些副本发送LeaderAndIsrRequest,使其成为follower副本,并发送UpdateMetadataSRequest请求

 

第六步:如果新的AR集合的副本都已经进入ISR列表,则执行一下步骤:

# 将新AR集合中所有副本都转化为OnlineReplica状态

# 更新 ControllerContext的AR记录为新的AR

# 如果当前leader副本在新的AR集合,则递增zookeeper和Controller

Context的leader_epoc值,并且发送LeaderAndIsrRequest和Update

MetadataSRequest请求

# 如果当前leader副本不在新的AR集合,或者leader副本 不可用,则将分区状态转化为OnlinePartition,主要目的是使用ReassignedPartitionLeaderSelector选举新的Leader副本,使得新的AR集合中一个副本成为新的leader,然后会发发送LeaderAndIsrRequest和UpdateMetadataSRequest请求

# 将(旧AR-新AR)中的副本转换为OfflineReplica,此步骤走会发送StopReplicaRequest,清理ISR列表中相关的副本,并且发送LeaderAndIsrRequest和UpdateMetadataSRequest请求

# 接着将将(旧AR-新AR)中的副本转换为ReplicaDeletionStart状态,此步骤发送StopReplicaRequest,完成删除后,将副本状态转化为状态ReplicaDeletionSuccessful

# 更新zookeeper的AR信息

# 将此分区相关信息从"/admin/reassign_partitions"移除

# 向所有broker发送UpdateMetadataRequest

# 尝试取消相关的topic不可删除状态,唤醒DeleteTopicsThread线程

 

def handleDataChange(dataPath: String, data: Object) {
  debug("Partitions reassigned listener fired for path %s. Record partitionsto be reassigned %s"
   
.format(dataPath, data))
  // zookeeper获取要重新分配副本的分区
 
val partitionsReassignmentData = ZkUtils.parsePartitionReassignmentData(data.toString)
  // 过滤掉正在进行重新分配的分区
 
val partitionsToBeReassigned = inLock(controllerContext.controllerLock) {
    partitionsReassignmentData.filterNot(p => controllerContext.partitionsBeingReassigned.contains(p._1))
  }
  partitionsToBeReassigned.foreach { partitionToBeReassigned =>
    inLock(controllerContext.controllerLock) {
      // 检测其topic是否为待删除的topic
     
if(controller.deleteTopicManager.isTopicQueuedUpForDeletion(partitionToBeReassigned._1.topic)) {
        error("Skipping reassignment of partition %s for topic %s since it iscurrently being deleted"
         
.format(partitionToBeReassigned._1, partitionToBeReassigned._1.topic))
        controller.removePartitionFromReassignedPartitions(partitionToBeReassigned._1)
      } else {
        // 开始进行分区的重新分配
       
val context = new ReassignedPartitionsContext(partitionToBeReassigned._2)
        controller.initiateReassignReplicasForTopicPartition(partitionToBeReassigned._1, context)
      }
    }
  }
}

 

 

def removePartitionFromReassignedPartitions(topicAndPartition: TopicAndPartition) {
  if(controllerContext.partitionsBeingReassigned.get(topicAndPartition).isDefined) {
    // 取消此分区注册的ReassignedPartitionsIsrChangeListener
   
zkUtils.zkClient.unsubscribeDataChanges(getTopicPartitionLeaderAndIsrPath(topicAndPartition.topic, topicAndPartition.partition),
      controllerContext.partitionsBeingReassigned(topicAndPartition).isrChangeListener)
  }
  // 获取将要重新分配的分区
 
val partitionsBeingReassigned = zkUtils.getPartitionsBeingReassigned()
  // 获取将要重新分配的分区删除与当前分区相关的数据
 
val updatedPartitionsBeingReassigned = partitionsBeingReassigned - topicAndPartition
 
// 将新的将要重新分配的分区信息写入zookeeper
 
zkUtils.updatePartitionReassignmentData(updatedPartitionsBeingReassigned.mapValues(_.newReplicas))
  // 更新ControllerContextpartitionsBeingReassigned,也是把正在删除topci的分区移除
 
controllerContext
.partitionsBeingReassigned.remove(topicAndPartition)
}

 

def initiateReassignReplicasForTopicPartition(topicAndPartition: TopicAndPartition,
                                      reassignedPartitionContext: ReassignedPartitionsContext) {
  // 获取需要reassign的分区的副本
 
val newReplicas = reassignedPartitionContext.newReplicas
 
val topic = topicAndPartition.topic
 
val partition = topicAndPartition.partition
 
// 从新副本过滤出可用的副本
 
val aliveNewReplicas = newReplicas.filter(r => controllerContext.liveBrokerIds.contains(r))
  try {
    val assignedReplicasOpt = controllerContext.partitionReplicaAssignment.get(topicAndPartition)
    assignedReplicasOptmatch {
      case Some(assignedReplicas) =>
        // 如果需要reassign的副本和之前的AR副本相同,则不用
       
if(assignedReplicas == newReplicas) {
          throw new KafkaException("Partition %s to bereassigned is already assigned to replicas".format(topicAndPartition) +
            " %s.Ignoring request for partition reassignment".format(newReplicas.mkString(",")))
        } else {
          info("Handling reassignment of partition %s to new replicas %s".format(topicAndPartition, newReplicas.mkString(",")))
          // 注册一个ISR改变监听器
         
watchIsrChangesForReassignedPartition(topic, partition, reassignedPartitionContext)
          controllerContext.partitionsBeingReassigned.put(topicAndPartition, reassignedPartitionContext)
          // reassign的时候,标记删除topic是不成功的
         
deleteTopicManager
.markTopicIneligibleForDeletion(Set(topic))
          // 真正执行partition reassign操作
         
onPartitionReassignment(topicAndPartition, reassignedPartitionContext)
        }
      case None => throw new KafkaException("Attempt to reassign partition %s that doesn't exist"
       
.format(topicAndPartition))
    }
  } catch {
    case e: Throwable => error("Error completing reassignment of partition %s".format(topicAndPartition), e)
    removePartitionFromReassignedPartitions(topicAndPartition)
  }
}

 

def onPartitionReassignment(topicAndPartition: TopicAndPartition, reassignedPartitionContext: ReassignedPartitionsContext) {
  val reassignedReplicas = reassignedPartitionContext.newReplicas
 
if (!areReplicasInIsr(topicAndPartition.topic, topicAndPartition.partition, reassignedReplicas)) {
    info("New replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
      "reassignednot yet caught up with the leader")
    val newReplicasNotInOldReplicaList = reassignedReplicas.toSet -- controllerContext.partitionReplicaAssignment(topicAndPartition).toSet
   
val newAndOldReplicas = (reassignedPartitionContext.newReplicas ++ controllerContext.partitionReplicaAssignment(topicAndPartition)).toSet
   
//1. Update ARin ZK with OAR + RAR.
   
updateAssignedReplicasForPartition(topicAndPartition, newAndOldReplicas.toSeq)
    //2. SendLeaderAndIsr request to every replica in OAR + RAR (with AR as OAR + RAR).
   
updateLeaderEpochAndSendRequest(topicAndPartition, controllerContext.partitionReplicaAssignment(topicAndPartition),
      newAndOldReplicas.toSeq)
    //3. replicasin RAR - OAR -> NewReplica
   
startNewReplicasForReassignedPartition(topicAndPartition, reassignedPartitionContext, newReplicasNotInOldReplicaList)
    info("Waiting for new replicas %s for partition %s being ".format(reassignedReplicas.mkString(","), topicAndPartition) +
      "reassignedto catch up with the leader")
  } else {
    //4. Waituntil all replicas in RAR are in sync with the leader.
   
val oldReplicas = controllerContext.partitionReplicaAssignment(topicAndPartition).toSet -- reassignedReplicas.toSet
   
//5. replicasin RAR -> OnlineReplica
   
reassignedReplicas.foreach { replica =>
      replicaStateMachine.handleStateChanges(Set(new PartitionAndReplica(topicAndPartition.topic, topicAndPartition.partition,
        replica)), OnlineReplica)
    }
    //6. Set AR toRAR in memory.
    //7. Send LeaderAndIsr request with apotential new leader (if current leader not in RAR) and
    //  a new AR (using RAR) and same isr to every broker in RAR
   
moveReassignedPartitionLeaderIfRequired(topicAndPartition, reassignedPartitionContext)
    //8. replicasin OAR - RAR -> Offline (force those replicas out of isr)
    //9. replicas in OAR - RAR ->NonExistentReplica (force those replicas to be deleted)
   
stopOldReplicasOfReassignedPartition(topicAndPartition, reassignedPartitionContext, oldReplicas)
    //10. UpdateAR in ZK with RAR.
   
updateAssignedReplicasForPartition(topicAndPartition, reassignedReplicas)
    //11. Updatethe /admin/reassign_partitions path in ZK to remove this partition.
   
removePartitionFromReassignedPartitions(topicAndPartition)
    info("Removed partition %s from the list of reassigned partitions inzookeeper".format(topicAndPartition))
    controllerContext.partitionsBeingReassigned.remove(topicAndPartition)
    //12. Afterelecting leader, the replicas and isr information changes, so resend the updatemetadata request to every broker
   
sendUpdateMetadataRequest(controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set(topicAndPartition))
    // signaldelete topic thread if reassignment for some partitions belonging to topicsbeing deleted just completed
   
deleteTopicManager
.resumeDeletionForTopics(Set(topicAndPartition.topic))   } }