建立topic——kafka源码探究之一

kafka是一款优秀的分布式消息队列,以吞吐量大,性能优秀著名。算法

一.相关的名词解释

broker:存储消息的物理实体,无状态,可多个扩展部署
Topic:一类消息的集合
Partition:一类消息的某个消息分区,一个topic有多个partition,可部署在多个broker上
Replicas:备份,partition级别,单个partition可有多个replicas,保证高可用
Controller:主broker,负责partition的leader选举
Zookeeper:存储broker、controller,consumer group等的信息,提供分布式锁等
ISR:in sync replica,同步副本列表,能保持fetch leader消息的replica,列表包括leader和follow
HW:high watermark,消费者能消费到的最新消息位置
LEO:log end offset,每一个partition的log最后一条message的位置。segmentfault

二.建立topic流程

kafka的topic建立逻辑分为两部分,命令行部分和controller的后台逻辑部分。命令行部分是脚本直接调用的主函数,负责校验命令以及分配replica,以后把分配结果写入zookeeper节点,再由controller监听zookeeper节点的变化,完成topic建立的后台逻辑部分。最好禁用auto-create,统一建立topic。缓存

流程:脚本命令实际是运行了TopicCommand的main函数,调用TopicCommand识别命令后判断是create命令后则调用createTopic方法。再判断是否有手动指定副本,即--replica-assignment参数,若是没有,则由系统自动分配,算法以下:dom

a.将全部Broker(假设共n个Broker)和待分配的Partition排序,随机选取一个broker做为开始分配的节点
b.假设选取第一个开始分配的broker为b1,将partition(p11)分配到b1,p12分配到b2,以此类推…
c.第一份replica(p1)分配完后,分配第二份replica(p2)。随机选取一个shift(步长),该shift做为p2j与p1j的间隔。假设随机出的shift为s,p21分配到b(1+s),p22分配到b(2+s).

如下是kafka0.8.2.2版本中关于分配算法的注释:
There are 3 goals of replica assignment:分布式

  1. Spread the replicas evenly among brokers.
  2. For partitions assigned to a particular broker, their other replicas are spread over the other brokers.
  3. If all brokers have rack information, assign the replicas for each partition to different racks if possible

To achieve this goal for replica assignment without considering racks, we:ide

  1. Assign the first replica of each partition by round-robin, starting from a random position in the broker list.
  2. Assign the remaining replicas of each partition with an increasing shift.

Here is an example of assigning
broker-0 broker-1 broker-2 broker-3 broker-4
p0 p1 p2 p3 p4 (1st replica)
p5 p6 p7 p8 p9 (1st replica)
p4 p0 p1 p2 p3 (2nd replica)
p8 p9 p5 p6 p7 (2nd replica)
p3 p4 p0 p1 p2 (3nd replica)
p7 p8 p9 p5 p6 (3nd replica)函数

简单来讲,分配的目标就是保证同一个partition的多个replica不能在同一个broker上且尽可能在broker间均匀分配。

若是有—replica-assignment参数,参数值遵储以下格式:id0:id1:id2,id3:id4:id5,id6:id7:id8.表示Topic一共有3个Partition(以“,”分隔),每一个Partition均有3个Replica(以“:”分隔)。

如下详细叙述无指定参数时建立topic的具体流程:
在TopicCommand的createTopic方法中,首先从zookeeper中得到当前brokers list,以后调用assignReplicasToBrokers方法得到partition和broker的分配关系,最后把该关系写入zk节点。性能

Kafka的controller内部的TopicChangeListener会监听/brokers/topics目录下子节点变化。一旦该目录子节点数发生变化会调用相应监听器的处理方法。先更新controller的缓存信息,以后是建立对应的分区机器副本对象,并为每一个分区肯定leader副本及ISR。fetch

对controller而言,内部包含两个状态机,分区状态机(PartitionStateMachine)和副本状态机(ReplicaStateMachine),两个状态机的状态变化如图1,图2所示。this

clipboard.png
图1 分区状态流转

clipboard.png
图2 副本状态流转

分区状态机监听到zk节点变动以后,主要进行了如下几个步骤操做。

1.得到zk下新的topic列表,与当前缓存的列表相比对,肯定新增的topic,更新controller缓存的topic列表
2.从/brokers/topics/${topic}节点中取出topic全部分区的副本分配方案,再更新controller对应的部分信息
3.调用onNewTopicCreation开始建立topic,注册分区变动监听器(PartitionModificationListener),再调用onNewPartitionCreation建立分区
4.建立分区对象,并设置成NewPartition状态;
5.为每一个分区建立对应的副本对象,从controller缓存中找出分区的分配方案,而后把分区下的全部副本设置成NewReplica状态
6.把分区状态转换为OnlinePartition,首先进行leader选举,方法是选取副本集合中的第一个副本做为leader副本,并把整个副本集合做为ISR,同时把这些信息更新到zk的brokers/topics/test/partitions/0/state节点,以后更新controller的leader缓存。最后将全部信息(UpdateMetadataRequest)发送到集群中其余的broker。
7.最后设置副本对象状态为OnlineReplica。

如下是详细的流程图:

图片描述

broker的高可用及高伸缩——kafka源码探究之二
https://segmentfault.com/a/11...

消息生产与消息存储——kafka源码探究之三
https://segmentfault.com/a/11...

相关文章
相关标签/搜索