本套技术专栏是做者(秦凯新)平时工做的总结和升华,经过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。期待加入IOT时代最具战斗力的团队。QQ邮箱地址:1120746959@qq.com,若有任何学术交流,可随时联系。node
某一个broker被选举出来承担特殊的角色,就是控制器Controller。缓存
Leader会向zookeeper上注册Watcher,其余broker几乎不用监听zookeeper的状态变化。学习
Controller集群就是用来管理和协调Kafka集群的,具体就是管理集群中全部分区的状态和分区对应副本的状态。this
每个Kafka集群任意时刻都只能有一个controller,当集群启动的时候,全部的broker都会参与到controller的竞选,最终只能有一个broker胜出。spa
Controller维护的状态分为两类:1:管理每一台Broker上对应的分区副本。2:管理每个Topic分区的状态。线程
KafkaController 核心代码,其中包含副本状态机和分区状态机scala
class KafkaController(val config : KafkaConfig, zkClient: ZkClient,
val brokerState: BrokerState) extends Logging with KafkaMetricsGroup {
this.logIdent = "[Controller " + config.brokerId + "]: "
private var isRunning = true
private val stateChangeLogger = KafkaController.stateChangeLogger
val controllerContext = new ControllerContext(zkClient, config.zkSessionTimeoutMs)
val partitionStateMachine = new PartitionStateMachine(this)
val replicaStateMachine = new ReplicaStateMachine(this)
private val controllerElector = new ZookeeperLeaderElector(controllerContext, ZkUtils.ControllerPath, onControllerFailover,
onControllerResignation, config.brokerId)
// have a separate scheduler for the controller to be able to start and stop independently of the
// kafka server
private val autoRebalanceScheduler = new KafkaScheduler(1)
var deleteTopicManager: TopicDeletionManager = null
val offlinePartitionSelector = new OfflinePartitionLeaderSelector(controllerContext, config)
private val reassignedPartitionLeaderSelector = new ReassignedPartitionLeaderSelector(controllerContext)
private val preferredReplicaPartitionLeaderSelector = new PreferredReplicaPartitionLeaderSelector(controllerContext)
private val controlledShutdownPartitionLeaderSelector = new ControlledShutdownLeaderSelector(controllerContext)
private val brokerRequestBatch = new ControllerBrokerRequestBatch(this)
private val partitionReassignedListener = new PartitionsReassignedListener(this)
private val preferredReplicaElectionListener = new PreferredReplicaElectionListener(this)
复制代码
KafkaController中共定义了五种selector选举器日志
一、ReassignedPartitionLeaderSelector
从可用的ISR中选取第一个做为leader,把当前的ISR做为新的ISR,将重分配的副本集合做为接收LeaderAndIsr请求的副本集合。
二、PreferredReplicaPartitionLeaderSelector
若是从assignedReplicas取出的第一个副本就是分区leader的话,则抛出异常,不然将第一个副本设置为分区leader。
三、ControlledShutdownLeaderSelector
将ISR中处于关闭状态的副本从集合中去除掉,返回一个新新的ISR集合,而后选取第一个副本做为leader,而后令当前AR做为接收LeaderAndIsr请求的副本。
四、NoOpLeaderSelector
原则上不作任何事情,返回当前的leader和isr。
五、OfflinePartitionLeaderSelector
从活着的ISR中选择一个broker做为leader,若是ISR中没有活着的副本,则从assignedReplicas中选择一个副本做为leader,leader选举成功后注册到Zookeeper中,并更新全部的缓存。
复制代码
kafka修改分区和副本数code
../bin/kafka-topics.sh --zookeeper 127.0.0.1:2181 --describe --topic test1
Topic:test1 PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test1 Partition: 0 Leader: 2 Replicas: 2,4 Isr: 2,4
Topic: test1 Partition: 1 Leader: 3 Replicas: 3,5 Isr: 3,5
Topic: test1 Partition: 2 Leader: 4 Replicas: 4,1 Isr: 4,1
复制代码
topic 分区扩容cdn
./kafka-topics.sh --zookeeper 127.0.0.1:2181 -alter --partitions 4 --topic test1
复制代码
Replica有7种状态:
1 NewReplica: 在partition reassignment期间KafkaController建立New replica
2 OnlineReplica: 当一个replica变为一个parition的assingned replicas时
其状态变为OnlineReplica, 即一个有效的OnlineReplica
3 Online状态的parition才能转变为leader或isr中的一员
4 OfflineReplica: 当一个broker down时, 上面的replica也随之die, 其状态转变为Onffline;
ReplicaDeletionStarted: 当一个replica的删除操做开始时,其状态转变为ReplicaDeletionStarted
5 ReplicaDeletionSuccessful: Replica成功删除后,其状态转变为ReplicaDeletionSuccessful
6 ReplicaDeletionIneligible: Replica成功失败后,其状态转变为ReplicaDeletionIneligible
7 NonExistentReplica: Replica成功删除后, 从ReplicaDeletionSuccessful状态转变为NonExistentReplica状态
复制代码
ReplicaStateMachine 所在文件: core/src/main/scala/kafka/controller/ReplicaStateMachine.scala
startup: 启动ReplicaStateMachine
initializeReplicaState: 初始化每一个replica的状态, 若是replica所在的broker是live状态,则此replica的状态为OnlineReplica。
处理能够转换到Online状态的Replica, handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica), 而且发送LeaderAndIsrRequest到各broker nodes: handleStateChanges(controllerContext.allLiveReplicas(), OnlineReplica)
当建立某个topic时,该topic下全部分区的全部副本都是NonExistent。
当controller加载Zookeeper中该topic每个分区的全部副本信息到内存中,同时将副本的状态变动为New。
以后controller选择该分区副本列表中的第一个副本做为分区的leader副本并设置全部副本进入ISR,而后在Zookeeper中持久化该决定。
一旦肯定了分区的Leader和ISR以后,controller会将这些消息以请求的方式发送给全部的副本。
同时将这些副本状态同步到集群的全部broker上以便让他们知晓。
最后controller 会把分区的全部副本状态设置为Online。
Partition有以下四种状态
NonExistentPartition: 这个partition尚未被建立或者是建立后又被删除了;
NewPartition: 这个parition已建立, replicas也已分配好,但leader/isr还未就绪;
OnlinePartition: 这个partition的leader选好;
OfflinePartition: 这个partition的leader挂了,这个parition状态为OfflinePartition;
复制代码
当建立Topic时,controller负责建立分区对象,它首先会短暂的将全部分区状态设置为NonExistent。
以后读取Zookeeper副本分配方案,而后令分区状态设置为NewPartion。
处于NewPartion状态的分区还没有有leader和ISR,所以Controller会初始化leader和ISR信息并设置分区状态为OnlinePartion,此时分区正常工做。
本套技术专栏是做者(秦凯新)平时工做的总结和升华,经过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。期待加入IOT时代最具战斗力的团队。QQ邮箱地址:1120746959@qq.com,若有任何学术交流,可随时联系。
本套技术专栏是做者(秦凯新)平时工做的总结和升华,经过从真实商业环境抽取案例进行总结和分享,并给出商业应用的调优建议和集群环境容量规划等内容,请持续关注本套博客。期待加入IOT时代最具战斗力的团队。QQ邮箱地址:1120746959@qq.com,若有任何学术交流,可随时联系。
kafka集群Controller主要干经过ZK持久化副本分配方案,根据副本分配方案建立分区,监听ZK znode状态变化作执行处理,维护分区和副本ISR机制稳定运行。感谢huxihx技术博客以及相关书籍,让我理解了Controller核心机制,写一篇学习笔记,做为总结,辛苦成文,实属不易,谢谢。
秦凯新 于深圳 201812021541