ISR:一个Partition中的Leader的全部follower(replication)集合。node
AR:分配给此Partition的全部replication,统称为Assigned Replicas(AR)。ISR是AR的子集。算法
Controller:broker中的Leader。并发
HW:消费端能看到的broker上消息的位置,客户端只能消费到HW位置。负载均衡
LEO:消息在log文件中最新的位置。异步
segment:replication对应一个文件夹中,实际存消息的文件socket
多个broker组成了kafka集群。性能
1. Broker均可以向producer提供metadata信息,metadata包括:集群中存活的broker列表;partitions leader列表等信息。当producer获取到metadata信息以后, producer将会和Topic下全部partition leader保持socket链接;消息由producer直接经过socket发送到broker,中间不会通过任何"路由层"。spa
2. 使用zookeeper用来注册broker信息,监测partition leader存活性。线程
一个consumer group下,不管有多少个consumer,这个group必定是把这个topic下全部的partition都消费了。设计
1) 当group中的consumer数量小于topic的partition数量:一个consumer消费多个partition。
2)当group中的consumer数量等于topic的partition数量:一个consumer消费一个partition。这时的效率最高。
3)当group中的consumer数量大于topic的partition数量:这时就会有consumer空闲,形成资源的浪费。
在设置consumer group时,只须要指定里面consumer数量,consumer会自动进行rebalance。
Consumer rebalance触发条件:
1)Consumer增长或删除会触发 Consumer Group的Rebalance。
2)Broker的增长或者减小都会触发 Consumer Rebalance。Consumer消费partition中的message是顺序读取,因此必需要维护上次读取message的位置。
1、旧版本的(在0.8.2以前):
1)high level API:offset存在于zookeeper中: Consumer默认是读完message先commmit再处理message,autocommit默认是是true,这时候先commit就会更新offset+1,一旦处理失败,offset已经+1,这个时候就会丢message;若是还有offset+1。那么consumer重启后就会重复消费这个message。也能够配置成读完消息处理再commit,这种状况下consumer端的响应就会比较慢的,须要等处理完才行。
2) low level API:由应用本身维护offset。
2、0.8.2以后:
1) 新的Comsumer API再也不有high-level、low-level之分了,而是本身维护offset。这样作的好处是避免应用出现异常时,数据未消费成功,但Position已经提交,致使消息未消费的状况发生。默认将消费的offset迁入到kafka一个名为__consumer_offsets 的Topic中。原理:利用kafka自身的topic,以消费的group,topic以及partition作为组合key,全部的消费offset都提交写入到名为__consumer_offsets 的Topic中。
Consumer消费的message是在topic下的某个partition的leader上。
当Consumer启动时,触发的操做:
1)进行"Consumer id Registry"。
2)在"Consumer id Registry"节点下注册一个watch用来监听当前group中其余consumer的"leave"和"join";只要此znode path下节点列表变动,都会触发此group下consumer的负载均衡.(好比一个consumer失效,那么其余consumer接管失效consumer的partition)。
3)在"Broker id registry"节点下,注册一个watch用来监听broker的存活状况;若是broker列表变动,将会触发全部的groups下的consumer从新balance。
使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader创建socket链接,并获取消息。
1)将全部Broker(假设共n个Broker)和待分配的Partition排序。
2)将第i个Partition分配到第(i mod n)个Broker上。
3)将第i个Partition的第j个Replica分配到第((i + j) mod n)个Broker上。
一个Partition的一个replication对应一个文件夹,文件夹中包含索引文件(每一个segment的offset范围)和多个segment文件(以第一条消息的offset命名)。
Partition和replication的关系:
topic:
Kafka的高可用性(HA)体如今:replication和Leader election(replication之间)。
在没有replication的状况下,由于partition在某一台broker上,一旦某一个broker宕机,那么这台broker上的Partition的数据都不能被消费,同时producer也不能提交消息到此topic的该Partition下。这样就违背了kafka的高可用性。因此引入了Replication机制。特别是在生产环境中。
引入Replication以后,那么一个partition就会有多个Replication,由于不一样的Replication存在于不一样的broker中,这样在一台broker宕机后,在其余broker上的该partition的Replication还能够提供服务。这样就保持了kafka的高可用。
有了Replication就必须考虑数据的一致性,这样才能保证在一个broker宕机后其余的Replication提供服务的时候数据不会丢失。kafka多个Replication,引入Leader,Producer和Consumer只与这个Leader打交道,其余的Replication做为此leader的follower,而且从leader拉取数据。若是不存在Leaader的话,全部的Replication都是能够同时读/写的,就必须保证多个Replication之间的数据同步,它们直接就要互相的同步数据(N*N条路)。这样的设计就至关的复杂,因此引入Leader后,让Leader来负责读/写,其余的Replication做为follower就从leader同步数据,这样高效简单。
Broker是否还存活必须知足2个条件:第一:必须维护与ZK的心跳机制,第二:上面的Follower必须能快速的从它的Leader那边同步消息过来,不能落后Leader太多。Leader会跟踪与其保持同步的Replication列表(ISR:in-sync Replication)。若是Follower宕机或者拖后太多,Leader将会把它从ISR列表中移除。
消息复制机制若是使用同步机制的话,就要求Leader全部的Follower都完成复制(Follower从Leader中pull数据,Follower收到数据并写入到log后,向Leader发送ACK),这样很影响吞吐率。
消息复制机制若是采用异步复制机制,Follower异步从Leader复制数据,数据只要被Leader写入log就认为消息是commit的状态,这种状况下若是全部的Follower都落后与Leader,那么在Leader宕机后,消息就会出现丢失。
Producer发布消息到topic的时候(实际上是发送到Partition中),先经过zookeeper找到该Partition的Leader。Producer只把消息发送到该Partition的Leader中。Leader会将消息写入其本地的log,每一个Follower都从Leader pull数据,这样Follower存储信息的顺序就和Leader同样了,Follower在pull到消息后也会把消息存放到本身的log中,向Leader发送ACK。一旦Leader收到了全部ISR列表中的Replication的ACK,这条消息就是已经commit(能够被消费了)的了,这时Leader将增长HW并想Producer发送ACK。这样有个问题,就是Leader要等到全部ISR中Replication的ACK,那么在ISR列表不少或者其中一个的ACK回来的比较慢的时候,这样就是影响总体的吞吐率。为了提升性能,每一个Follower获得pull消息后就立马给Leader发送ACK,不会等到放入log中。因此commit的消息,只能保证它存在于多个Replication的内存中,不能保证它被持久化到磁盘中。就不能彻底保证在出现异常后,这条消息能被消费。可是这个问题刚好适用于“该问题刚好不解决”。比较很是的少见,因此在性能和可用性上作了一个平衡。
Consumer读取消息是从Leader中读取,而且必须是Commit的消息才能被消费。只能消费到HW位置。
Partition有了Leader后,由于Leader只会有一台,那么在这台机子宕机后,就须要在剩下的follower中从新选择出一个拥有最新数据的follower来变成Leader来对外提供服务。在election中,kafka没有使用“Majority Vote”(“少数服从多数”)的算法,它的劣势是能容忍的失败的follower个数比较少。在kafka的parttion的ISR模式下,保证在不丢失已经commit的消息的前提下,能容忍F个Replica失败(总个数是F+1),由于ISR列表中里面全部的Replica都有leader中的数据。因此只要有一个Replica还在,数据就是所有的commit数据。因此直接在ISR中选举Leader。
如何选举?在集群中全部的broker中选出一个controller(集群中只会有一个controller),集群中全部Partition的Leader选举都由该broker解决,这个controller主要负责1:parttion的Leader变化事件。2:新建立和删除一个topic。3:从新分配parttion。4:管理分区的状态机和副本的状态机。当controller决定一个Partition的Leader和ISR后,会将此决定持久化到ZK节点中,而且向全部受到影响的Broker经过RPC的形式直接发送新的决策。
在broker宕机,Controller注册在ZK的/brokers/ids的Watcher会触发调用onBrokerFailure,这台宕机的broker上的Replica多是某个parttion的Leader或者是某个parttion的follower,若是是parttion的Leader,那么该Controller要确保有其余的broker成为这个parttion的Leader,若是是parttion的follower,不须要从新选举该parttion的Leader,可是该parttion的ISR有可能会发生变化(由于这台follower的broker宕机了,要从ISR列表中移除)。因此Controller首先会读取ZK中该parttion的ISR/AR选举新的Leader和ISR,而后把这个信息先保存到ZK中,最后把包含了新Leader和ISR的LeaderAndISR指令发送给受到影响的Brokers。收到指令的Broker若是Controller命令它成为某个Partition的Leader,那么原来为Follower的Replica所在的Broker就成为了Partition的Leader。,收到指令的broker原先的Replica是Follower收到指令没有让它成为Leader那么它依然是的Replica,becomeFollower。
例如过程以下:
1:有三个broker:brokerA,brokerB,brokerC
2:Partition1有三个副本:Replica1,Replica2,Replica3。其中Replica1是Leader,ISR=[1,2,3]
3:Replica1所在的BrokerA挂掉了(Partition1没有了Leader),Controller注册的Watcher会触发调用onBrokerFailure
4:Controller会读取ZK中的leaderAndISR,选举出新的leader和ISR:Leader=Replica2,ISR=[2,3]
5:Controller将最新的leaderAndISR写到ZK中(leader=Replica2,ISR=[2,3])
6:Controller将最新的leaderAndISR构形成LeaderAndISRRequest命令发送给Broker2,Broker3
7:Replica2所在的BrokerB收到指令,由于最新的leader指示Replica2是Leader,因此Replica2成为Partition1的Leader
8: Replica3所在的BrokerC收到指令,Replica3仍然是follower,而且在ISR中,becomeFollower
Broker宕机后Controller端的处理步骤以下:
1:从ZK中读取现存的brokers
2:broker宕机,引发partition的Leader或ISR变化,获取在宕机的broker上的partitions:set_p。
3:循环set_p的每一个Partition P:
从ZK的leaderAndISR节点读取P的当前ISR
决定P的新Leader和新ISR(优先级分别是ISR中存活的broker,AR中任意存活的做为Leader)
将P最新的leader,ISR,回写到ZK的leaderAndISR节点
4:将set_p中每一个Partition的LeaderAndISR指令(里面包括最新的leaderAndISR数据)发送给受到影响的brokers
假设AR=[1,2,3,4,5],ISR=[1,2,3],可是存活Brokers=[2,3,5]。选择Leader的方式是ISR中目前存活的Brokers,好比目前存活的Broker是[2,3,5],因此ISR中的副本1是不能做为Leader的,也不会再做为ISR了。Leader的选举是选举目前还存活的[2,3]中的一个,ISR的肯定是选举在当前ISR中仍然存活的Broker=[2,3]。因此最后Leader=2,ISR=[2,3]。
假设AR=[1,2,3,4,5],ISR=[1,2,3],可是存活的Brokers=[4,6,7]。由于ISR中没有一个Broker在当前处于存活状态,因此只能退而求其次从AR中选择。幸运的是AR中的4目前是存活的,因此Leader=4,ISR=[4]。因为4再也不ISR中,因此这种状况有可能会形成数据丢失,由于只有选举处于ISR中的,才不会丢失数据,可是如今ISR中的没有一个存活,因此也只好选择有可能丢失的Broekr,总比找不到任何的Broker要好。
Partition有多个Replica,Replica是分布在Broker上的。因此一个Broker上受到影响的Replica的Partition确定还有其余的Replica分布在其余Broker上。因此含有宕机Broker的Partition的Replica的节点都是受到影响的broker。
假如Broker1上有三个Replica,第一个Replica是Partition1的Leader,第二个Replica是Partition2的Follower,第三个Replica是Partition3的Follower。若是是Leader,则受影响到的broker要被Controller负责选出这个Replica对应的Partition的新Leader。若是是follower,也有可能影响了Partition的ISR,因此Leader要负责更新ISR。
因为Controller是从Brokers中选举出来的,因此Controller所在的节点也会做为Partition的存储节点的。当Controller挂掉后,Controller自己做为Broker也会触发新的Controller调用on_broker_change。可是在尚未选举出新的Controller以前,挂掉的Broker的on_broker_change不会被新的Controller调用(由于根本就没有可用的Controller)。因此对于挂掉的Controller节点,最紧迫的任务是首先选举出新的Controller,而后再由新的Controller触发挂掉的那个Controller的on_broker_change。
Broker失败和Controller失败是不一样的:Broker的failover是在Controller端处理的,由于咱们知道Broker挂掉了,Controller负责在挂掉Broker和受影响的Broker之间更新数据(将新的leaderAndISR发送给受影响的Broker)。而Controller的failover则是在Broker处理的(成功建立Controller的那一个Broker)。
1:新建立一个topic,会同时指定Partition的个数,每一个Partition都有AR信息写到ZK中。
2:为新建立的每一个Partition初始化leader
选择AR中的一个存活的Broker做为新Leader,ISR=AR
将新Leader和ISR写到ZK的leaderAndISR节点
3:发送LeaderAndISRCommand给受到影响的brokers
4:若是是删除一个topic,则发送StopReplicaCommand给受影响的brokers。
Controller会在/brokers/topics上注册Watcher,因此有新topic建立/删除时,Controller会经过Watch获得新建立/删除的Topic的Partition/Replica分配。对于新建立的Topic,分配给Partition 的AR中全部的Replica都尚未数据,可认为它们都是同步的,也即都在ISR中(ISR=AR),任意一个Replica均可做为Leader。
建立或删除topic的过程和onBrokerFailure相似都要通过三个步骤:1) 选举Leader和ISR;2) 将leaderAndISR写到ZK中;3) 将最新leaderAndISR的LeaderAndISR指令发送给受到影响的Brokers。这是由于brokerChange致使Partition的Leader或者ISR发生变化,而新建立topic时,根本就没有Leader和ISR,因此二者都须要为Partition选择Leader和ISR。
存在这样一个状况:有多个broker同时声称是一个Partition的leader。好比brokerA是partition的初始Leader,partition的ISR是{A,B,C}。有一天brokerA由于某种缘由失去它在ZK中的注册信息。这时controller会认为brokerA当掉了,并将partition的leader分配给了brokerA,设置新的ISR为{B,C},并写到ZK中(每次partition的leader发生变化,epoch也会增长)。在brokerB成为leader的同时,brokerA从缘由中恢复过来,可是没有接收到controller发送的leadershipi变化指令。这样如今brokerA和brokerB都认为本身是partition的Leader,若是咱们容许broker A和B能同时提交消息那就很是不幸了,由于全部副本之间的数据同步就会不一致了。不过当前的设计中其实是不会容许出现这种状况的:当brokerB成为leader以后,brokerA是没法再提交任何新的消息的。
Kafka是怎么作的到?假设容许存在两个leader,生产者会同时往这两个leader写数据,可是能不能提交消息就相似于两阶段提交协议了:kafka的leader要可以提交一个消息的保证是ISR中的全部副本都复制成功。对于brokerA为了保证能提交消息m,它须要ISR中的每一个副本(A,B,C)都要接收到消息m,而这个时候broker A仍然认为ISR是{A,B,C},(这是broker A的一份本地拷贝,虽然这个时候ZK中的ISR已经被broker B改变了:ISR是{B,C}),可是ISR中的brokerB是不会再接收到消息m的。由于在brokerB成为Leader的时候,它会首先关掉到以前到旧的brokerA中抓取数据的线程(brokerB以前是follower,会向leader抓取数据,只有follower抓取数据,leader才能判断消息是否能提交),由于brokerB的抓取线程被关闭了,brokerA会认为B没法遇上Leader,既然由于B受到影响不能提交消息m,broker A干脆就想要把B从ISR中移除,这个时候broker A要将本身认为的最新的ISR写到ZK中。不过不幸的是broker A并不能完成这个操做:由于在写ZK的时候broker A会发现本身的epoch版本和ZK中的当前值并不匹配(broker B在选举为Leader以后会写到ZK中,并将epoch增长1,任何新的写操做的epoch都不能比当前epoch小),直到这个时刻,broker A才意识到它已经再也不是partition的leader了。这个时候broker A只能接受本身再也不是该partition的Leader的事实了。
controller首先将受影响的partitions的新leader写到zk的leaderAndISR节点,而后才会向brokers发送leader改变的命令,brokers收到命令后会对leader改变的事件做出响应。因为客户端请求会使用leaderAndISR的数据来链接leader所在的节点,客户端的请求被路由到新leader所在的broker节点,但若是那个broker尚未准备好成为leader,就存在一个时间窗口对于客户端而言是不可用的。kafka这里则是controller先更新元数据(写入leaderAndISR)后才发送命令给broker,所以可能元数据已经更新,可是broker还没收到命令,或者收到命令后还没准备好成为leader。因此可能你会认为不要让controller先更新leaderAndISR节点,而是发送指令给brokers,每一个broker在收到指令并处理完成后才,让每一个broker来更新这个节点的数据。不过这里让controller来更新leaderAndISR节点是有缘由的:咱们是依赖ZK的leaderAndISR节点来保持controller和partition的leader的同步。当controller选举出新的leader以后,它不但愿新的Leader的ISR被旧的Leader改变,不然的话(假设ISR能够被旧leader改变),新选举出来的leader在接管正式成为leader以前可能会被当前leader从ISR中剔除出去。经过当即将新leader发布到leaderAndISR节点,controller可以防止当前leader更新ISR(选举新的leader在写到ZK时,epoch增长,而若是当前leader想要更新ISR好比将新选举的leader从ISR中剔除掉,由于epoch不匹配,因此当前leader就再也不有机会更新ISR了)。
客户端能够利用自身的失败重连等机制来实现路由。
一般状况下,follower的HW老是落后于leader的HW。因此在leader变化时,消费者发送的offset中可能会落在新leader的HW和LEO之间(由于消费者的消费进度依赖于Leader的HW,旧Leader的HW比较高,而原先的follower做为新Leader,它的HW仍是落后于旧Leader,因此消费者的offset虽然比旧Leader的HW低,可是有可能比新Leader的HW要大)。若是没有发生leader变化,服务端会返回OffsetOutOfRangeException给客户端。若是请求的offset介于HW和LEO之间,服务端会返回空消息集给消费者。
LeaderAndISRCommand:
1:读取命令中的partition
2:处理每一个partition
若是P在本地不存在,调用startReplica()建立一个新的Replia。
指令要求这个Broker成为P的新Leader,调用becomeLeader。
指令要求这个Broker做为Leader l的follower,调用becomeFollower。
3:若是指令中有INIT标记,则删除不在set_p中的全部本地partitions
becomeLeader指的是当前接收LeaderAndISRCommand指令的Broker,原先是一个follower,如今要转变为Leader。因为做为Follower期间,它会从Leader抓取数据,而如今Leader不在了,因此首先要中止抓取数据线程。follower转变为Leader以后,要负责读写数据,因此要启动提交线程负责将消息存储到本地日志文件中。
becomeFollower对于要转变为Follower的replica,原先若是是Leader的话,则要中止提交线程,因为当前Replica的leader可能会发生变化,因此在开始时要中止抓取线程,在最后要新建立到Replica最新leader的抓取线程,这中间还要截断日志到Replica的HW位置。
注意有可能新leader的HW会比以前的leader的HW要落后,这是由于新leader有多是ISR,也有多是AR中的replica。而原先做为Follower的replica,它的HW只会在向Leader发送抓取请求时,Leader在抓取响应中除了返回消息也会附带本身的HW给follower,Follower收到消息和HW后,才会更新本身的replica的HW,这中间有必定的时间间隔会致使Follower的HW会比Leader的HW要低。所以Follower在转变为Leader以后,它的HW是有可能比老的Leader的HW要低的。若是在leader角色转变以后,一个消费者客户端请求的offset可能比新的Leader的HW要大(由于消费者最多只消费到Leader的HW位置,可是消费者并不关心Leader到底有没有变化,因此若是旧的Leader的HW=10,那么客户端就能够消费到offset=10这个位置,而Leader发生转变后,HW可能下降为9,而这个时候客户端继续发送offset=10,就有可能比Leader的HW要大了!)。这种状况下,若是消费者要消费Leader的HW到LEO之间的数据,Broker会返回空的集合,而若是消费者请求的offset比LEO还要大,就会抛出OffsetOutofRangeException(LEO表示的是日志的最新位置,HW比LEO要小,客户端只能消费到HW位置,更不可能消费到LEO了)。
startReplica表示启动一个Replica,若是不存在Partition目录,则建立。并启动Replica的HW checkpoint线程,咱们已经知道了Follower的HW是经过发送抓取请求,接收应答中包含了Leader的HW,设置为Follower Replica的HW(而Leader的HW又是由ISR提交来决定的,因此说ISR决定了HW可以增长,而Follower的HW则来自于Leader的HW)。
StopReplicaCommand
1:从指令中读取partitions集合
2:对每一个partition P的每一个Replica ,调用stopReplica
中止和r关联的抓取线程(固然针对的是Follower Replica)
中止r的HW checkpoint线程
删除partition目录