Kafka的Repilica分配策略

提及kafka真的是极大地方便了个人工做,每次用起ta都心怀感激.好用不说,还至关稳定.网络


爱屋及乌,我决心一探kafka的究竟.dom


对我来讲最感兴趣的莫过于这几个个问题:ide


1.在建立topic的时候,kafka如何分配partition以及replica所在的位置.函数


2.要是一个broker down了,那它的replica该怎么从新分配.this


3.若是一个broker由于2成为了一个topic的新replica,那么他没有以前的那些message该怎么办?须要从其余broker拉过来吗,若是要拉,那么数据量太大的会不会对网络形成负载?orm


4.kafka的Ack机制.当producer发来一条消息,是经由leader转发,仍是能够直接放到replica所在的broker上.当一个broker存下了消息,就发ack给produce,r仍是等大多数/全部replica存下了消息再发ack给producer.rem




这篇博客先讨论第一个问题.kafka


当客户端发起create topic请求时,在broker上会有这样的调用栈.
源码


KafkaApis.handleCreateTopicsRequest()->adminManager.createTopics()->AdminUtils.assignReplicasToBrokers()博客

真正关于assignReplicas就在assignReplicasToBrokers这个函数中完成.


现来看AdminUtils这个类中,做者在源码上的注释


* There are 3 goals of replica assignment:

*

* 1. Spread the replicas evenly among brokers.

* 2. For partitions assigned to a particular broker, their other replicas are spread over the other brokers.

* 3. If all brokers have rack information, assign the replicas for each partition to different racks if possible

*

* To achieve this goal for replica assignment without considering racks, we:

* 1. Assign the first replica of each partition by round-robin, starting from a random position in the broker list.

* 2. Assign the remaining replicas of each partition with an increasing shift.


重点是下面两条!


1.从broker-list中选定一个随机的位置,从这个位置开始,将每个partition的第一个replica依次赋予brokerList中的broker.


好比如今有broker0~4,同时该topic有10个partition,随机选定的起始位置是broker0,那么就从broker0开始依次赋予partition,当partition超过了broker的数目时,再回到一开始选定的broker开始分配,就是以下效果.


* broker-0  broker-1  broker-2  broker-3  broker-4

* p0        p1        p2        p3        p4       (1st replica)

* p5        p6        p7        p8        p9       (1st replica)

2.当分配好了第一个replica以后,剩下的replica以第一个replica所在的broker为基准,依次赋予以后的broker

好比partition0的replica0给了broker2,那么partion0的replica1与replica2依次赋予broker3和broker4


* broker-0  broker-1  broker-2  broker-3  broker-4

* p0        p1        p2        p3        p4       (1st replica)

* p5        p6        p7        p8        p9       (1st replica)

* p4        p0        p1        p2        p3       (2nd replica)

* p8        p9        p5        p6        p7       (2nd replica)

* p3        p4        p0        p1        p2       (3nd replica)

* p7        p8        p9        p5        p6       (3nd replica)


那么brokerList从哪里来?

这就有关于kafka与zookeeper的协做了,kafka问zookeeper要本身所在kafka集群的brokerList.

谁问zookeeper要brokerList?

kafka集群会经过zookeeper选出一个集群中的leader,由这个leader与zookeeper交互.选举或者加入集群成为follower在一个broker初始化的时候完成.




下面上一些源码,均摘自AdminUtils类.在这篇文章中,我暂且不讨论考虑racker的状况,并去掉了一些错误检查的部分.


private def assignReplicasToBrokersRackUnaware(nPartitions: Int,

                                               replicationFactor: Int,

                                               brokerList: Seq[Int],

                                               fixedStartIndex: Int,

                                               startPartitionId: Int): Map[Int, Seq[Int]] = {

  val ret = mutable.Map[Int, Seq[Int]]()

  val brokerArray = brokerList.toArray

  val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)

  var currentPartitionId = math.max(0, startPartitionId)

  var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)

  for (_ <- 0 until nPartitions) {

    if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))

      nextReplicaShift += 1

    val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length

    val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))

    for (j <- 0 until replicationFactor - 1)

      replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))

    ret.put(currentPartitionId, replicaBuffer)

    currentPartitionId += 1

  }

  ret

}

首先关注一下这个方法的返回值,这是一个map的返回值.key是partition的ID,而value是这个partition所在的brokerList.

每一个partition的第一个replica分配方式同我上文所说的相同,而这个partiion剩下的replica分配方式与我上文所说的实现有稍微的不一样,就是加入了nextReplicashift.总之就是通过了某些的运算,replica并非在broker之间依次分配下去的.而是间隔了nextReplicaShift个broker分配的.


private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {

  val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)

  (firstReplicaIndex + shift) % nBrokers

}

能够看到的,每两个replica之间所间隔的broker数目取决与1.nextReplicaShit的大小 2.该replica是该partition的第几个replica

最后的shif在不考虑shift超出了broker数目的状况下t为1+nextReplicaShift+replicaindex


至于为何要这么作,多是尽可能将备份放在间隔远的机器上.来提升容灾备份的能力吧.

相关文章
相关标签/搜索