本文共分为三个部分:
- Kafka Topic建立方式
- Kafka Topic Partitions Assignment实现原理
- Kafka资源隔离方案
1. Kafka Topic建立方式
Kafka Topic建立方式有如下两种表现形式:
(1)建立Topic时直接指定Topic Partition Replica与Kafka Broker之间的存储映射关系
/usr/lib/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --zookeeper ZooKeeperHost:ZooKeeperPort --create --topic TopicName --replica-assignment id0:id1:id2,id3:id4:id5,id6:id7:id8
其中,“id0:id1:id2,id3:id4:id5,id6:id7:id8”表示Topic TopicName一共有3个Partition(以“,”分隔),每一个Partition均有3个Replica(以“:”分隔),Topic Partition Replica与Kafka Broker之间的对应关系以下:
Partition0 Replica:Broker id0、Broker id一、Broker id2;
Partition1 Replica:Broker id三、Broker id四、Broker id5;
Partition2 Replica:Broker id六、Broker id七、Broker id8;
(2)建立Topic时由Kafka自动分配Topic Partition Replica与Kafka Broker之间的存储映射关系
/usr/lib/kafka_2.10-0.8.2.1/bin/kafka-topics.sh --zookeeper ZooKeeperHost:ZooKeeperPort --create --topic TopicName
第(1)种方式彻底依靠人为手工指定,这里仅仅探讨使用第(2)种方式建立Topic时,“自动分配”是如何实现的。
2. Kafka Topic Partition Replica Assignment实现原理
Replica Assignment的目标有两个:
(1)使Partition Replica可以均匀地分配至各个Kafka Broker(负载均衡);
(2)若是Partition的第一个Replica分配至某一个Kafka Broker,那么这个Partition的其它Replica则须要分配至其它的Kafka Brokers,即Partition Replica分配至不一样的Broker;
注意,这里有一个约束条件:Topic Partition Replicas Size <= Kafka Brokers Size。
“自动分配”的核心工做过程以下:
随机选取一个StartingBroker(Broker id0、Broker id一、Broker id二、...),随机选取IncreasingShift初始值([0,nBrokers - 1])
(1)从StartingBroker开始,使用轮询的方式依次将各个Partition的Replicas分配至各个Broker;
对于每个Partition,Replicas的分配过程以下:
(2)Partition的第一个Replica分配至StartingBroker;
(3)根据IncreasingShift计算第n(n>=2)个Replica的Shift(即与第1个Replica的间隔量),依据Shift将其分配至相应的Broker;
(4)StartingBroker移至下一个Broker;
(5)若是Brokers已经被轮询完一次,则IncreasingShift递增一;不然,继续(2)。
假设有5个Brokers(broker-0、broker-一、broker-二、broker-三、broker-4),Topic有10个Partition(p0、p一、p二、p三、p四、p五、p六、p七、p八、p9),每个Partition有3个Replica,依据上述工做过程,分配结果以下:
broker-0 broker-1 broker-2 broker-3 broker-4
p0 p1 p2 p3 p4 (1st replica)
p5 p6 p7 p8 p9 (1st replica)
p4 p0 p1 p2 p3 (2nd replica)
p8 p9 p5 p6 p7 (2nd replica)
p3 p4 p0 p1 p2 (3nd replica)
p7 p8 p9 p5 p6 (3nd replica)
详细步骤以下:
选取broker-0做为StartingBroker,IncreasingShift初始值为1,
对于p0,replica1分配至broker-0,IncreasingShift为1,因此replica2分配至broker-1,replica3分配至broker-2;
对于p1,replica1分配至broker-1,IncreasingShift为1,因此replica2分配至broker-2,replica3分配至broker-3;
对于p2,replica1分配至broker-2,IncreasingShift为1,因此replica2分配至broker-3,replica3分配至broker-4;
对于p3,replica1分配至broker-3,IncreasingShift为1,因此replica2分配至broker-4,replica3分配至broker-1;
对于p4,replica1分配至broker-4,IncreasingShift为1,因此replica2分配至broker-0,replica3分配至broker-1;
注:IncreasingShift用于计算Shift,Shift表示Partition的第n(n>=2)个Replica与第1个Replica之间的间隔量。若是IncreasingShift值为m,那么Partition的第2个Replica与第1个Replica的间隔量为m + 1,第3个Replica与第1个Replica的间隔量为m + 2,...,依次类推。Shift的取值范围:[1,brokerSize - 1]。
此时,broker-0、broker-一、broker-二、broker-三、broker-4分别做为StartingBroker被轮询分配一次,继续轮询;但IncreasingShift递增为2。
对于p5,replica1分配至broker-0,IncreasingShift为2,因此replica2分配至broker-2,replica3分配至broker-3;
对于p6,replica1分配至broker-1,IncreasingShift为2,因此replica2分配至broker-3,replica3分配至broker-4;
对于p7,replica1分配至broker-2,IncreasingShift为2,因此replica2分配至broker-4,replica3分配至broker-0;
对于p8,replica1分配至broker-3,IncreasingShift为2,因此replica2分配至broker-0,replica3分配至broker-1;
对于p9,replica1分配至broker-4,IncreasingShift为2,因此replica2分配至broker-1,replica3分配至broker-2;
此时,broker-0、broker-一、broker-二、broker-三、broker-4分别做为StartingBroker再次被轮询一次,若是还有其它Partition,则继续轮询,IncreasingShift递增为3,依次类推。
这里有几点须要注意:
(1)为何要随机选取StartingBroker,而不是每次都选取broker-0做为StartingBroker?
以broker-0、broker-一、broker-二、broker-三、broker-4为例,由于分配过程是以轮询方式进行的,若是每次都选取broker-0做为StartingBroker,那么Brokers列表中的前面部分将有可能被分配相对比较多的Partition Replicas,从而致使这部分Brokers负载较高,随机选取能够保证相对比较好的均匀效果。
(2)为何Brokers列表每次轮询一次,IncreasingShift值都须要递增1?
Kafka Topic Partition数目较多的状况下,Partition的第1个Replica与第n(n>=2)个Replica之间的间隔量随着IncreasingShift的变化面变化,可以更好的均匀分配Replica。
scala.kafka.admin.AdminUtils.assignReplicasToBrokers()实现上述Topic Partition Replica与Broker之间的分配过程,源码以下:
brokerList:Kafka Brokers列表;
nPartitions:Topic待分配的Partition数目;
replicationFactor:Topic Partition Replica数目;
fixedStartIndex:若是显示指定,默认值为0;它的值与两个变量值相关:startIndex和nextReplicaShift,详情见后;
startPartitionId:从Topic的哪个Partition开始分配,一般状况下是0,Topic增长Partition时该值不为0。
val ret = new mutable.HashMap[Int, List[Int]]()
分配结果保存至一个Map变量ret,key为Partition Id,value为分配的Brokers列表。
val startIndex = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
var currentPartitionId = if (startPartitionId >= 0) startPartitionId else 0
var nextReplicaShift = if (fixedStartIndex >= 0) fixedStartIndex else rand.nextInt(brokerList.size)
startIndex表示StartingBroker,currentPartitionId表示当前为哪一个Partition分配Brokers,nextReplicaShift表示当前的IncreasingShit值。
接下来就是一个循环,用于为每个Partition的Replicas分配Brokers,其中Partition的第1个Replica由“(currentPartitionId + startIndex) % brokerList.size”决定,其他的Replica由“replicaIndex()”决定。
shift表示着第n(n >= 2)个Replica与第一个Replica之间的间隔量,“1 + (secondReplicaShift + replicaIndex) % (nBrokers - 1)”的计算方式很是巧妙,它保证了shift的取值范围:[1,nBrokers](你们能够本身体会一下)。
3. Kafka资源隔离方案
实时数据处理场景中,若是数据量比较大,为了保证写入/消费的吞吐量,咱们建立Topic时一般会指定比较大的Partition数目,从而使得数据尽量地被分散至更多的Partition,Partition被尽量均匀的分配至Kafka集群中的各个Broker,从负载均衡的角度看,一切都很美好。从业务的角度看,会有资源竞争的问题,毕竟Kafka Broker机器的带宽资源是有限的,在带宽比较紧张的情形下,任何一个业务方的数据量波动(这里仅指数据量增长),全部的业务方都会受到影响;从运维的角度看,会有可用性的问题,任何一台Kafka Broker机器都负载着全部Topic的数据传输、存储,若是出现宕机的状况,将会波及到全部的Topic。针对这种状况,咱们提出了划分资源池的资源隔离方案:
Kafka集群有9台Brokers组成:broker-一、broker-二、...、broker-9,建立9个Topic:t一、t二、...、t9,每一个Topic有9个Partition(假设Replica为1),如上图所示,咱们将9台Brokers切分红3个资源池:Pool1(broker-一、broker-二、broker-3)、Pool2(broker-四、broker-五、broker-6)、Pool3(broker-七、broker-八、broker-9),Topic的分配状况以下:
Pool1:t一、t二、t3
Pool2:t四、t五、t6
Pool3:t七、t八、t9
能够看出,这三个资源池的物理资源是彻底独立的,三个资源池实际上至关于三个小集群。
这种资源池的划分方式不但能够作到物理资源的隔离,还能够必定程度上解决异构机型(MEM、DISK)带来的问题,能够把机型类似的机器组成一个资源池。实际实施时须要综合考虑业务状况、机器状况,合理划分资源池,并根据具体的Topic状况将其分配至合适的资源池内。
Kafka Topic的建立也变为两步:
(1)使用kafka-topics.sh建立Topic;
(2)使用kafka-reassign-partitions.sh移动Topic Partition Replicas至指定的资源池(具体的Brokers列表)。