Kafka(三)Kafka的高可用与生产消费过程解析

一  Kafka HA设计解析

1.1 为什么须要Replication

  在Kafka在0.8之前的版本中,是没有Replication的,一旦某一个Broker宕机,则其上全部的Partition数据都不可被消费,这与Kafka数据持久性及Delivery Guarantee的设计目标相悖。同时Producer都不能再将数据存于这些Partition中。node

  若是Producer使用同步模式则Producer会在尝试从新发送message.send.max.retries(默认值为3)次后抛出Exception,用户能够选择中止发送后续数据也可选择继续选择发送。而前者会形成数据的阻塞,后者会形成本应发往该Broker的数据的丢失。算法

  若是Producer使用异步模式,则Producer会尝试从新发送message.send.max.retries(默认值为3)次后记录该异常并继续发送后续数据,这会形成数据丢失而且用户只能经过日志发现该问题。同时,Kafka的Producer并未对异步模式提供callback接口。服务器

  因而可知,在没有Replication的状况下,一旦某机器宕机或者某个Broker中止工做则会形成整个系统的可用性下降。随着集群规模的增长,整个集群中出现该类异常的概率大大增长,所以对于生产系统而言Replication机制的引入很是重要。网络

1.2 Leader Election

  引入Replication以后,同一个Partition可能会有多个Replica,而这时须要在这些Replication之间选出一个Leader,Producer和Consumer只与这个Leader交互,其它Replica做为Follower从Leader中复制数据。session

  由于须要保证同一个Partition的多个Replica之间的数据一致性(其中一个宕机后其它Replica必需要能继续服务而且即不能形成数据重复也不能形成数据丢失)。若是没有一个Leader,全部Replica均可同时读/写数据,那就须要保证多个Replica之间互相(N×N条通路)同步数据,数据的一致性和有序性很是难保证,大大增长了Replication实现的复杂性,同时也增长了出现异常的概率。而引入Leader后,只有Leader负责数据读写,Follower只向Leader顺序Fetch数据(N条通路),系统更加简单且高效。并发

1.3 如何将全部Replica均匀分布到整个集群

为了更好的作负载均衡,Kafka尽可能将全部的Partition均匀分配到整个集群上。一个典型的部署方式是一个Topic的Partition数量大于Broker的数量。同时为了提升Kafka的容错能力,也须要将同一个Partition的Replica尽可能分散到不一样的机器。实际上,若是全部的Replica都在同一个Broker上,那一旦该Broker宕机,该Partition的全部Replica都没法工做,也就达不到HA的效果。同时,若是某个Broker宕机了,须要保证它上面的负载能够被均匀的分配到其它幸存的全部Broker上。app

Kafka分配Replica的算法以下:负载均衡

1.将全部Broker(假设共n个Broker)和待分配的Partition排序异步

2.将第i个Partition分配到第(i mod n)个Broker上分布式

3.将第i个Partition的第j个Replica分配到第((i + j) mode n)个Broker上

1.4 Data Replication(副本策略)

Kafka的高可靠性的保障来源于其健壮的副本(replication)策略。

1.4.1 消息传递同步策略

Producer在发布消息到某个Partition时,先经过ZooKeeper找到该Partition的Leader,而后不管该Topic的Replication Factor为多少,Producer只将该消息发送到该Partition的Leader。Leader会将该消息写入其本地Log。每一个Follower都从Leader pull数据。这种方式上,Follower存储的数据顺序与Leader保持一致。Follower在收到该消息并写入其Log后,向Leader发送ACK。一旦Leader收到了ISR中的全部Replica的ACK,该消息就被认为已经commit了,Leader将增长HW而且向Producer发送ACK。

为了提升性能,每一个Follower在接收到数据后就立马向Leader发送ACK,而非等到数据写入Log中。所以,对于已经commit的消息,Kafka只能保证它被存于多个Replica的内存中,而不能保证它们被持久化到磁盘中,也就不能彻底保证异常发生后该条消息必定能被Consumer消费。

Consumer读消息也是从Leader读取,只有被commit过的消息才会暴露给Consumer。

Kafka Replication的数据流以下图所示:

1.4.2 ACK前须要保证有多少个备份

对于Kafka而言,定义一个Broker是否“活着”包含两个条件:

  • 一是它必须维护与ZooKeeper的session(这个经过ZooKeeper的Heartbeat机制来实现)。
  • 二是Follower必须可以及时将Leader的消息复制过来,不能“落后太多”。

Leader会跟踪与其保持同步的Replica列表,该列表称为ISR(即in-sync Replica)。若是一个Follower宕机,或者落后太多,Leader将把它从ISR中移除。这里所描述的“落后太多”指Follower复制的消息落后于Leader后的条数超过预约值(该值可在$KAFKA_HOME/config/server.properties中经过replica.lag.max.messages配置,其默认值是4000)或者Follower超过必定时间(该值可在$KAFKA_HOME/config/server.properties中经过replica.lag.time.max.ms来配置,其默认值是10000)未向Leader发送fetch请求。

Kafka的复制机制既不是彻底的同步复制,也不是单纯的异步复制。事实上,彻底同步复制要求全部能工做的Follower都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率(高吞吐率是Kafka很是重要的一个特性)。而异步复制方式下,Follower异步的从Leader复制数据,数据只要被Leader写入log就被认为已经commit,这种状况下若是Follower都复制完都落后于Leader,而若是Leader忽然宕机,则会丢失数据。而Kafka的这种使用ISR的方式则很好的均衡了确保数据不丢失以及吞吐率。Follower能够批量的从Leader复制数据,这样极大的提升复制性能(批量写磁盘),极大减小了Follower与Leader的差距。

须要说明的是,Kafka只解决fail/recover,不处理“Byzantine”(“拜占庭”)问题。一条消息只有被ISR里的全部Follower都从Leader复制过去才会被认为已提交。这样就避免了部分数据被写进了Leader,还没来得及被任何Follower复制就宕机了,而形成数据丢失(Consumer没法消费这些数据)。而对于Producer而言,它能够选择是否等待消息commit,这能够经过request.required.acks来设置。这种机制确保了只要ISR有一个或以上的Follower,一条被commit的消息就不会丢失。

1.4.3 Leader Election算法

Leader选举本质上是一个分布式锁,有两种方式实现基于ZooKeeper的分布式锁:

  • 节点名称惟一性:多个客户端建立一个节点,只有成功建立节点的客户端才能得到锁
  • 临时顺序节点:全部客户端在某个目录下建立本身的临时顺序节点,只有序号最小的才得到锁

一种很是经常使用的选举leader的方式是“Majority Vote”(“少数服从多数”),但Kafka并未采用这种方式。这种模式下,若是咱们有2f+1个Replica(包含Leader和Follower),那在commit以前必须保证有f+1个Replica复制完消息,为了保证正确选出新的Leader,fail的Replica不能超过f个。由于在剩下的任意f+1个Replica里,至少有一个Replica包含有最新的全部消息。这种方式有个很大的优点,系统的latency只取决于最快的几个Broker,而非最慢那个。Majority Vote也有一些劣势,为了保证Leader Election的正常进行,它所能容忍的fail的follower个数比较少。若是要容忍1个follower挂掉,必需要有3个以上的Replica,若是要容忍2个Follower挂掉,必需要有5个以上的Replica。也就是说,在生产环境下为了保证较高的容错程度,必需要有大量的Replica,而大量的Replica又会在大数据量下致使性能的急剧降低。这就是这种算法更多用在ZooKeeper这种共享集群配置的系统中而不多在须要存储大量数据的系统中使用的缘由。例如HDFS的HA Feature是基于majority-vote-based journal,可是它的数据存储并无使用这种方式。

Kafka在ZooKeeper中动态维护了一个ISR(in-sync replicas),这个ISR里的全部Replica都跟上了leader,只有ISR里的成员才有被选为Leader的可能。在这种模式下,对于f+1个Replica,一个Partition能在保证不丢失已经commit的消息的前提下容忍f个Replica的失败。在大多数使用场景中,这种模式是很是有利的。事实上,为了容忍f个Replica的失败,Majority Vote和ISR在commit前须要等待的Replica数量是同样的,可是ISR须要的总的Replica的个数几乎是Majority Vote的一半。

虽然Majority Vote与ISR相比有不需等待最慢的Broker这一优点,可是Kafka做者认为Kafka能够经过Producer选择是否被commit阻塞来改善这一问题,而且节省下来的Replica和磁盘使得ISR模式仍然值得。

1.4.4 如何处理全部Replica都不工做

在ISR中至少有一个follower时,Kafka能够确保已经commit的数据不丢失,但若是某个Partition的全部Replica都宕机了,就没法保证数据不丢失了。这种状况下有两种可行的方案:

1.等待ISR中的任一个Replica“活”过来,而且选它做为Leader

2.选择第一个“活”过来的Replica(不必定是ISR中的)做为Leader

这就须要在可用性和一致性当中做出一个简单的折衷。若是必定要等待ISR中的Replica“活”过来,那不可用的时间就可能会相对较长。并且若是ISR中的全部Replica都没法“活”过来了,或者数据都丢失了,这个Partition将永远不可用。选择第一个“活”过来的Replica做为Leader,而这个Replica不是ISR中的Replica,那即便它并不保证已经包含了全部已commit的消息,它也会成为Leader而做为consumer的数据源(前文有说明,全部读写都由Leader完成)。Kafka0.8.*使用了第二种方式。根据Kafka的文档,在之后的版本中,Kafka支持用户经过配置选择这两种方式中的一种,从而根据不一样的使用场景选择高可用性仍是强一致性。

1.4.5 选举Leader

最简单最直观的方案是,全部Follower都在ZooKeeper上设置一个Watch,一旦Leader宕机,其对应的ephemeral znode会自动删除,此时全部Follower都尝试建立该节点,而建立成功者(ZooKeeper保证只有一个能建立成功)便是新的Leader,其它Replica即为Follower。

可是该方法会有3个问题:

1.split-brain 这是由ZooKeeper的特性引发的,虽然ZooKeeper能保证全部Watch按顺序触发,但并不能保证同一时刻全部Replica“看”到的状态是同样的,这就可能形成不一样Replica的响应不一致

2.herd effect 若是宕机的那个Broker上的Partition比较多,会形成多个Watch被触发,形成集群内大量的调整

3.ZooKeeper负载太重 每一个Replica都要为此在ZooKeeper上注册一个Watch,当集群规模增长到几千个Partition时ZooKeeper负载会太重。

Kafka 0.8.*的Leader Election方案解决了上述问题,它在全部broker中选出一个controller,全部Partition的Leader选举都由controller决定。controller会将Leader的改变直接经过RPC的方式(比ZooKeeper Queue的方式更高效)通知需为为此做为响应的Broker。同时controller也负责增删Topic以及Replica的从新分配。

二 Kafka生产过程分析

2.1 写入方式

producer采用推(push)模式将消息发布到broker,每条消息都被追加(append)到分区(patition)中,属于顺序写磁盘(顺序写磁盘效率比随机写内存要高,保障kafka吞吐率)。

2.2 分区(Partition)

Kafka集群有多个消息代理服务器(broker-server)组成,发布到Kafka集群的每条消息都有一个类别,用主题(topic)来表示。一般,不一样应用产生不一样类型的数据,能够设置不一样的主题。一个主题通常会有多个消息的订阅者,当生产者发布消息到某个主题时,订阅了这个主题的消费者均可以接收到生成者写入的新消息。

Kafka集群为每一个主题维护了分布式的分区(partition)日志文件,物理意义上能够把主题(topic)看做进行了分区的日志文件(partition log)。主题的每一个分区都是一个有序的、不可变的记录序列,新的消息会不断追加到日志中。分区中的每条消息都会按照时间顺序分配到一个单调递增的顺序编号,叫作偏移量(offset),这个偏移量可以惟一地定位当前分区中的每一条消息。

消息发送时都被发送到一个topic,其本质就是一个目录,而topic是由一些Partition Logs(分区日志)组成,其组织结构以下图所示:

下图中的topic有3个分区,每一个分区的偏移量都从0开始,不一样分区之间的偏移量都是独立的,不会相互影响。 

咱们能够看到,每一个Partition中的消息都是有序的,生产的消息被不断追加到Partition log上,其中的每个消息都被赋予了一个惟一的offset值。

发布到Kafka主题的每条消息包括键值和时间戳。消息到达服务器端的指定分区后,都会分配到一个自增的偏移量。原始的消息内容和分配的偏移量以及其余一些元数据信息最后都会存储到分区日志文件中。消息的键也能够不用设置,这种状况下消息会均衡地分布到不一样的分区。

1) 分区的缘由

(1)方便在集群中扩展,每一个Partition能够经过调整以适应它所在的机器,而一个topic又能够有多个Partition组成,所以整个集群就能够适应任意大小的数据了;

(2)能够提升并发,由于能够以Partition为单位读写了。

传统消息系统在服务端保持消息的顺序,若是有多个消费者消费同一个消息队列,服务端会以消费存储的顺序依次发送给消费者。但因为消息是异步发送给消费者的,消息到达消费者的顺序多是无序的,这就意味着在并行消费时,传统消息系统没法很好地保证消息被顺序处理。虽然咱们能够设置一个专用的消费者只消费一个队列,以此来解决消息顺序的问题,可是这就使得消费处理没法真正执行。

Kafka比传统消息系统有更强的顺序性保证,它使用主题的分区做为消息处理的并行单元。Kafka以分区做为最小的粒度,将每一个分区分配给消费者组中不一样的并且是惟一的消费者,并确保一个分区只属于一个消费者,即这个消费者就是这个分区的惟一读取线程。那么,只要分区的消息是有序的,消费者处理的消息顺序就有保证。每一个主题有多个分区,不一样的消费者处理不一样的分区,因此Kafka不只保证了消息的有序性,也作到了消费者的负载均衡。

2)分区的原则

(1)指定了patition,则直接使用;

(2)未指定patition但指定key,经过对key的value进行hash出一个patition

(3)patition和key都未指定,使用轮询选出一个patition。

DefaultPartitioner类

public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
        int numPartitions = partitions.size();
        if (keyBytes == null) {
            int nextValue = nextValue(topic);
            List<PartitionInfo> availablePartitions = cluster.availablePartitionsForTopic(topic);
            if (availablePartitions.size() > 0) {
                int part = Utils.toPositive(nextValue) % availablePartitions.size();
                return availablePartitions.get(part).partition();
            } else {
                // no partitions are available, give a non-available partition
                return Utils.toPositive(nextValue) % numPartitions;
            }
        } else {
            // hash the keyBytes to choose a partition
            return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
        }
    }

2.3 副本(Replication)

同一个partition可能会有多个replication(对应 server.properties 配置中的 default.replication.factor=N)。没有replication的状况下,一旦broker 宕机,其上全部 patition 的数据都不可被消费,同时producer也不能再将数据存于其上的patition。引入replication以后,同一个partition可能会有多个replication,而这时须要在这些replication之间选出一个leader,producer和consumer只与这个leader交互,其它replication做为follower从leader 中复制数据。

2.4  写入流程

 producer写入消息流程以下:

 

1)producer先从zookeeper的 "/brokers/.../state"节点找到该partition的leader

2)producer将消息发送给该leader

3)leader将消息写入本地log

4)followers从leader pull消息,写入本地log后向leader发送ACK

5)leader收到全部ISR中的replication的ACK后,增长HW(high watermark,最后commit 的offset)并向producer发送ACK

三 broker保存消息

3.1 存储方式

物理上把 topic 分红一个或多个 patition(对应 server.properties 中的 num.partitions=3 配置),每一个 patition 物理上对应一个文件夹(该文件夹存储该 patition 的全部消息和索引文件),以下:

3.2 存储策略

不管消息是否被消费,kafka 都会保留全部消息。有两种策略能够删除旧数据:

一、 基于时间:log.retention.hours=168 
二、 基于大小:log.retention.bytes=1073741824

须要注意的是,由于Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,因此这里删除过时文件与提升 Kafka 性能无关。

3.3Zookeeper存储结构

admin

该目录下znode只有在有相关操做时才会存在,操做结束时会将其删除

/admin/reassign_partitions用于将一些Partition分配到不一样的broker集合上。对于每一个待从新分配的Partition,Kafka会在该znode上存储其全部的Replica和相应的Broker id。该znode由管理进程建立而且一旦从新分配成功它将会被自动移除。

broker

即/brokers/ids/[brokerId])存储“活着”的broker信息。

topic注册信息(/brokers/topics/[topic]),存储该topic的全部partition的全部replica所在的broker id,第一个replica即为preferred replica,对一个给定的partition,它在同一个broker上最多只有一个replica,所以broker id可做为replica id。

controller

/controller -> int (broker id of the controller)存储当前controller的信息

/controller_epoch -> int (epoch)直接以整数形式存储controller epoch,而非像其它znode同样以JSON字符串形式存储。

四 Kafka消费过程分析

kafka提供了两套consumer API:高级Consumer API和低级API。

4.1消费模型

消息由生产者发布到Kafka集群后,会被消费者消费。消息的消费模型有两种:推送模型(push)和拉取模型(pull)。

基于推送模型(push)的消息系统,由消息代理记录消费者的消费状态。消息代理在将消息推送到消费者后,标记这条消息为已消费,但这种方式没法很好地保证消息被处理。好比,消息代理把消息发送出去后,当消费进程挂掉或者因为网络缘由没有收到这条消息时,就有可能形成消息丢失(由于消息代理已经把这条消息标记为已消费了,但实际上这条消息并无被实际处理)。若是要保证消息被处理,消息代理发送完消息后,要设置状态为“已发送”,只有收到消费者的确认请求后才更新为“已消费”,这就须要消息代理中记录全部的消费状态,这种作法显然是不可取的。

Kafka采用拉取模型,由消费者本身记录消费状态,每一个消费者互相独立地顺序读取每一个分区的消息。以下图所示,有两个消费者(不一样消费者组)拉取同一个主题的消息,消费者A的消费进度是3,消费者B的消费进度是6。消费者拉取的最大上限经过最高水位(watermark)控制,生产者最新写入的消息若是尚未达到备份数量,对消费者是不可见的。这种由消费者控制偏移量的优势是:消费者能够按照任意的顺序消费消息。好比,消费者能够重置到旧的偏移量,从新处理以前已经消费过的消息;或者直接跳到最近的位置,从当前的时刻开始消费。

在一些消息系统中,消息代理会在消息被消费以后当即删除消息。若是有不一样类型的消费者订阅同一个主题,消息代理可能须要冗余地存储同一消息;或者等全部消费者都消费完才删除,这就须要消息代理跟踪每一个消费者的消费状态,这种设计很大程度上限制了消息系统的总体吞吐量和处理延迟。Kafka的作法是生产者发布的全部消息会一致保存在Kafka集群中,无论消息有没有被消费。用户能够经过设置保留时间来清理过时的数据,好比,设置保留策略为两天。那么,在消息发布以后,它能够被不一样的消费者消费,在两天以后,过时的消息就会自动清理掉。

4.2高级API

1)高级API优势

高级API 写起来简单

不须要自行去管理offset,系统经过zookeeper自行管理。

不须要管理分区,副本等状况,.系统自动管理。

消费者断线会自动根据上一次记录在zookeeper中的offset去接着获取数据(默认设置1分钟更新一下zookeeper中存的offset)

可使用group来区分对同一个topic 的不一样程序访问分离开来(不一样的group记录不一样的offset,这样不一样程序读取同一个topic才不会由于offset互相影响)

2)高级API缺点

不能自行控制offset(对于某些特殊需求来讲)

不能细化控制如分区、副本、zk等

4.3低级API

1)低级 API 优势

可以让开发者本身控制offset,想从哪里读取就从哪里读取。

自行控制链接分区,对分区自定义进行负载均衡

对zookeeper的依赖性下降(如:offset不必定非要靠zk存储,自行存储offset便可,好比存在文件或者内存中)

2)低级API缺点

太过复杂,须要自行控制offset,链接哪一个分区,找到分区leader 等。

4.4消费者组

消费者是以consumer group消费者组的方式工做,由一个或者多个消费者组成一个组,共同消费一个topic。每一个分区在同一时间只能由group中的一个消费者读取,可是多个group能够同时消费这个partition。在图中,有一个由三个消费者组成的group,有一个消费者读取主题中的两个分区,另外两个分别读取一个分区。某个消费者读取某个分区,也能够叫作某个消费者是某个分区的拥有者。

在这种状况下,消费者能够经过水平扩展的方式同时读取大量的消息。另外,若是一个消费者失败了,那么其余的group成员会自动负载均衡读取以前失败的消费者读取的分区。

4.5 消费方式

consumer采用pull(拉)模式从broker中读取数据。

push(推)模式很难适应消费速率不一样的消费者,由于消息发送速率是由broker决定的。它的目标是尽量以最快速度传递消息,可是这样很容易形成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则能够根据consumer的消费能力以适当的速率消费消息。

对于Kafka而言,pull模式更合适,它可简化broker的设计,consumer可自主控制消费消息的速率,同时consumer能够本身控制消费方式——便可批量消费也可逐条消费,同时还能选择不一样的提交方式从而实现不一样的传输语义。

pull模式不足之处是,若是kafka没有数据,消费者可能会陷入循环中,一直等待数据到达。为了不这种状况,咱们在咱们的拉请求中有参数,容许消费者请求在等待数据到达的“长轮询”中进行阻塞(而且可选地等待到给定的字节数,以确保大的传输大小)。

4.6消费者组案例

1)需求:测试同一个消费者组中的消费者,同一时刻只能有一个消费者消费。

2)案例实操

(1)在node2一、node22上修改/opt/module/kafka/config/consumer.properties配置文件中的group.id属性为任意组名。

[root@node22 config]$ vi consumer.properties

group.id=admin

(2)在node2一、node22上分别启动消费者

[root@node21 kafka]$ bin/kafka-console-consumer.sh --zookeeper node21:2181,node22:2181,node23:2181  --topic first --consumer.config config/consumer.properties

[root@node22 kafka]$ bin/kafka-console-consumer.sh --zookeeper node21:2181,node22:2181,node23:2181  --topic first --consumer.config config/consumer.properties

(3)在node23上启动生产者

[root@node23 kafka]$ bin/kafka-console-producer.sh --broker-list node21:9092 --topic first

>hello world

(4)查看node21和node22的接收者。

同一时刻只有一个消费者接收到消息。

五 Topic的建立和删除

5.1 建立topic

建立 topic 的序列图以下所示:

流程说明:

一、 controller 在 ZooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被建立,则 controller 会经过 watch 获得该 topic 的 partition/replica 分配。
二、 controller从 /brokers/ids 读取当前全部可用的 broker 列表,对于 set_p 中的每个 partition:
     2.一、 从分配给该 partition 的全部 replica(称为AR)中任选一个可用的 broker 做为新的 leader,并将AR设置为新的 ISR 
     2.二、 将新的 leader 和 ISR 写入 /brokers/topics/[topic]/partitions/[partition]/state 
三、 controller 经过 RPC 向相关的 broker 发送 LeaderAndISRRequest。

5.2 删除topic

删除 topic 的序列图以下所示:

流程说明:

一、 controller 在 zooKeeper 的 /brokers/topics 节点上注册 watcher,当 topic 被删除,则 controller 会经过 watch 获得该 topic 的 partition/replica 分配。 
二、 若 delete.topic.enable=false,结束;不然 controller 注册在 /admin/delete_topics 上的 watch 被 fire,controller 经过回调向对应的 broker 发送 StopReplicaRequest。

六 broker failover

kafka broker failover 序列图以下所示:

流程说明:

一、 controller 在 zookeeper 的 /brokers/ids/[brokerId] 节点注册 Watcher,当 broker 宕机时 zookeeper 会 fire watch
二、 controller 从 /brokers/ids 节点读取可用broker 
三、 controller决定set_p,该集合包含宕机 broker 上的全部 partition 
四、 对 set_p 中的每个 partition 
    4.一、 从/brokers/topics/[topic]/partitions/[partition]/state 节点读取 ISR 
    4.二、 决定新 leader 
    4.三、 将新 leader、ISR、controller_epoch 和 leader_epoch 等信息写入 state 节点
五、 经过 RPC 向相关 broker 发送 leaderAndISRRequest 命令

七 controller failover

当 controller 宕机时会触发 controller failover。每一个 broker 都会在 zookeeper 的 "/controller" 节点注册 watcher,当 controller 宕机时 zookeeper 中的临时节点消失,全部存活的 broker 收到 fire 的通知,每一个 broker 都尝试建立新的 controller path,只有一个竞选成功并当选为 controller。

当新的 controller 当选时,会触发 KafkaController.onControllerFailover 方法,在该方法中完成以下操做:

一、 读取并增长 Controller Epoch。 
二、 在 reassignedPartitions Patch(/admin/reassign_partitions) 上注册 watcher。 
三、 在 preferredReplicaElection Path(/admin/preferred_replica_election) 上注册 watcher。 
四、 经过 partitionStateMachine 在 broker Topics Patch(/brokers/topics) 上注册 watcher。 
五、 若 delete.topic.enable=true(默认值是 false),则 partitionStateMachine 在 Delete Topic Patch(/admin/delete_topics) 上注册 watcher。 
六、 经过 replicaStateMachine在 Broker Ids Patch(/brokers/ids)上注册Watch。 
七、 初始化 ControllerContext 对象,设置当前全部 topic,“活”着的 broker 列表,全部 partition 的 leader 及 ISR等。 
八、 启动 replicaStateMachine 和 partitionStateMachine。 
九、 将 brokerState 状态设置为 RunningAsController。 
十、 将每一个 partition 的 Leadership 信息发送给全部“活”着的 broker。 
十一、 若 auto.leader.rebalance.enable=true(默认值是true),则启动 partition-rebalance 线程。 
十二、 若 delete.topic.enable=true 且Delete Topic Patch(/admin/delete_topics)中有值,则删除相应的Topic。
相关文章
相关标签/搜索