提及kafka真的是极大地方便了个人工做,每次用起ta都心怀感激.好用不说,还至关稳定.网络
爱屋及乌,我决心一探kafka的究竟.dom
对我来讲最感兴趣的莫过于这几个个问题:ide
1.在建立topic的时候,kafka如何分配partition以及replica所在的位置.函数
2.要是一个broker down了,那它的replica该怎么从新分配.this
3.若是一个broker由于2成为了一个topic的新replica,那么他没有以前的那些message该怎么办?须要从其余broker拉过来吗,若是要拉,那么数据量太大的会不会对网络形成负载?orm
4.kafka的Ack机制.当producer发来一条消息,是经由leader转发,仍是能够直接放到replica所在的broker上.当一个broker存下了消息,就发ack给produce,r仍是等大多数/全部replica存下了消息再发ack给producer.rem
这篇博客先讨论第一个问题.kafka
当客户端发起create topic请求时,在broker上会有这样的调用栈.
源码
KafkaApis.handleCreateTopicsRequest()->adminManager.createTopics()->AdminUtils.assignReplicasToBrokers()博客
真正关于assignReplicas就在assignReplicasToBrokers这个函数中完成.
现来看AdminUtils这个类中,做者在源码上的注释
* There are 3 goals of replica assignment:
*
* 1. Spread the replicas evenly among brokers.
* 2. For partitions assigned to a particular broker, their other replicas are spread over the other brokers.
* 3. If all brokers have rack information, assign the replicas for each partition to different racks if possible
*
* To achieve this goal for replica assignment without considering racks, we:
* 1. Assign the first replica of each partition by round-robin, starting from a random position in the broker list.
* 2. Assign the remaining replicas of each partition with an increasing shift.
重点是下面两条!
1.从broker-list中选定一个随机的位置,从这个位置开始,将每个partition的第一个replica依次赋予brokerList中的broker.
好比如今有broker0~4,同时该topic有10个partition,随机选定的起始位置是broker0,那么就从broker0开始依次赋予partition,当partition超过了broker的数目时,再回到一开始选定的broker开始分配,就是以下效果.
* broker-0 broker-1 broker-2 broker-3 broker-4
* p0 p1 p2 p3 p4 (1st replica)
* p5 p6 p7 p8 p9 (1st replica)
2.当分配好了第一个replica以后,剩下的replica以第一个replica所在的broker为基准,依次赋予以后的broker
好比partition0的replica0给了broker2,那么partion0的replica1与replica2依次赋予broker3和broker4
* broker-0 broker-1 broker-2 broker-3 broker-4
* p0 p1 p2 p3 p4 (1st replica)
* p5 p6 p7 p8 p9 (1st replica)
* p4 p0 p1 p2 p3 (2nd replica)
* p8 p9 p5 p6 p7 (2nd replica)
* p3 p4 p0 p1 p2 (3nd replica)
* p7 p8 p9 p5 p6 (3nd replica)
那么brokerList从哪里来?
这就有关于kafka与zookeeper的协做了,kafka问zookeeper要本身所在kafka集群的brokerList.
谁问zookeeper要brokerList?
kafka集群会经过zookeeper选出一个集群中的leader,由这个leader与zookeeper交互.选举或者加入集群成为follower在一个broker初始化的时候完成.
下面上一些源码,均摘自AdminUtils类.在这篇文章中,我暂且不讨论考虑racker的状况,并去掉了一些错误检查的部分.
private def assignReplicasToBrokersRackUnaware(nPartitions: Int,
replicationFactor: Int,
brokerList: Seq[Int],
fixedStartIndex: Int,
startPartitionId: Int): Map[Int, Seq[Int]] = {
val ret = mutable.Map[Int, Seq[Int]]()
val brokerArray = brokerList.toArray
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
var currentPartitionId = math.max(0, startPartitionId)
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerArray.length)
for (_ <- 0 until nPartitions) {
if (currentPartitionId > 0 && (currentPartitionId % brokerArray.length == 0))
nextReplicaShift += 1
val firstReplicaIndex = (currentPartitionId + startIndex) % brokerArray.length
val replicaBuffer = mutable.ArrayBuffer(brokerArray(firstReplicaIndex))
for (j <- 0 until replicationFactor - 1)
replicaBuffer += brokerArray(replicaIndex(firstReplicaIndex, nextReplicaShift, j, brokerArray.length))
ret.put(currentPartitionId, replicaBuffer)
currentPartitionId += 1
}
ret
}
首先关注一下这个方法的返回值,这是一个map的返回值.key是partition的ID,而value是这个partition所在的brokerList.
每一个partition的第一个replica分配方式同我上文所说的相同,而这个partiion剩下的replica分配方式与我上文所说的实现有稍微的不一样,就是加入了nextReplicashift.总之就是通过了某些的运算,replica并非在broker之间依次分配下去的.而是间隔了nextReplicaShift个broker分配的.
private def replicaIndex(firstReplicaIndex: Int, secondReplicaShift: Int, replicaIndex: Int, nBrokers: Int): Int = {
val shift = 1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)
(firstReplicaIndex + shift) % nBrokers
}
能够看到的,每两个replica之间所间隔的broker数目取决与1.nextReplicaShit的大小 2.该replica是该partition的第几个replica
最后的shif在不考虑shift超出了broker数目的状况下t为1+nextReplicaShift+replicaindex
至于为何要这么作,多是尽可能将备份放在间隔远的机器上.来提升容灾备份的能力吧.