消息队列的性能好坏,其文件存储机制设计是衡量一个消息队列服务技术水平和最关键指标之一。下面将从Kafka文件存储机制和物理结构角度,分析Kafka是如何实现高效文件存储,及实际应用效果。
1.1 Kafka的特性:
- 高吞吐量、低延迟:kafka每秒能够处理几十万条消息,它的延迟最低只有几毫秒,每一个topic能够分多个partition, consumer group 对partition进行consume操做。
- 可扩展性:kafka集群支持热扩展
- 持久性、可靠性:消息被持久化到本地磁盘,而且支持数据备份防止数据丢失
- 容错性:容许集群中节点失败(若副本数量为n,则容许n-1个节点失败)
- 高并发:支持数千个客户端同时读写
1.2 Kafka的使用场景:
- 日志收集:一个公司能够用Kafka能够收集各类服务的log,经过kafka以统一接口服务的方式开放给各类consumer,例如hadoop、Hbase、Solr等。
- 消息系统:解耦和生产者和消费者、缓存消息等。
- 用户活动跟踪:Kafka常常被用来记录web用户或者app用户的各类活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,而后订阅者经过订阅这些topic来作实时的监控分析,或者装载到hadoop、数据仓库中作离线分析和挖掘。
- 运营指标:Kafka也常常用来记录运营监控数据。包括收集各类分布式应用的数据,生产各类操做的集中反馈,好比报警和报告。
- 流式处理:好比spark streaming和storm
- 事件源
1.3 Kakfa的设计思想
-
Kakfa Broker Leader的选举:Kakfa Broker集群受Zookeeper管理。全部的Kafka Broker节点一块儿去Zookeeper上注册一个临时节点,由于只有一个Kafka Broker会注册成功,其余的都会失败,因此这个成功在Zookeeper上注册临时节点的这个Kafka Broker会成为Kafka Broker Controller,其余的Kafka broker叫Kafka Broker follower。(这个过程叫Controller在ZooKeeper注册Watch)。这个Controller会监听其余的Kafka Broker的全部信息,若是这个kafka broker controller宕机了,在zookeeper上面的那个临时节点就会消失,此时全部的kafka broker又会一块儿去Zookeeper上注册一个临时节点,由于只有一个Kafka Broker会注册成功,其余的都会失败,因此这个成功在Zookeeper上注册临时节点的这个Kafka Broker会成为Kafka Broker Controller,其余的Kafka broker叫Kafka Broker follower。例如:一旦有一个broker宕机了,这个kafka broker controller会读取该宕机broker上全部的partition在zookeeper上的状态,并选取ISR列表中的一个replica做为partition leader(若是ISR列表中的replica全挂,选一个幸存的replica做为leader; 若是该partition的全部的replica都宕机了,则将新的leader设置为-1,等待恢复,等待ISR中的任一个Replica“活”过来,而且选它做为Leader;或选择第一个“活”过来的Replica(不必定是ISR中的)做为Leader),这个broker宕机的事情,kafka controller也会通知zookeeper,zookeeper就会通知其余的kafka broker。
这里曾经发生过一个bug,TalkingData使用Kafka0.8.1的时候,kafka controller在Zookeeper上注册成功后,它和Zookeeper通讯的timeout时间是6s,也就是若是kafka controller若是有6s中没有和Zookeeper作心跳,那么Zookeeper就认为这个kafka controller已经死了,就会在Zookeeper上把这个临时节点删掉,那么其余Kafka就会认为controller已经没了,就会再次抢着注册临时节点,注册成功的那个kafka broker成为controller,而后,以前的那个kafka controller就须要各类shut down去关闭各类节点和事件的监听。可是当kafka的读写流量都很是巨大的时候,TalkingData的一个bug是,因为网络等缘由,kafka controller和Zookeeper有6s中没有通讯,因而从新选举出了一个新的kafka controller,可是原来的controller在shut down的时候老是不成功,这个时候producer进来的message因为Kafka集群中存在两个kafka controller而没法落地。致使数据淤积。
这里曾经还有一个bug,
TalkingData使用Kafka0.8.1的时候,当ack=0的时候,表示producer发送出去message,只要对应的kafka broker topic partition leader接收到的这条message,producer就返回成功,无论partition leader 是否真的成功把message真正存到kafka。当ack=1的时候,
表示producer发送出去message,同步的把message存到对应topic的partition的leader上,而后producer就返回成功,partition leader异步的把message同步到其余partition replica上。当ack=all或-1,
表示producer发送出去message,同步的把message存到对应topic的partition的leader和对应的replica上以后,才返回成功。可是若是某个
kafka controller
切换的时候,会致使partition leader的切换(老的
kafka controller上面的partition leader会选举到其余的kafka broker上
),可是这样就会致使丢数据。
-
Consumergroup:各个consumer(consumer 线程)能够组成一个组(Consumer group ),partition中的每一个message只能被组(Consumer group )中的一个consumer(consumer 线程)消费,若是一个message能够被多个consumer(consumer 线程)消费的话,那么这些consumer必须在不一样的组。Kafka不支持一个partition中的message由两个或两个以上的同一个consumer group下的consumer thread来处理,除非再启动一个新的consumer group。因此若是想同时对一个topic作消费的话,启动多个consumer group就能够了,可是要注意的是,这里的多个consumer的消费都必须是顺序读取partition里面的message,新启动的consumer默认从partition队列最头端最新的地方开始阻塞的读message。它不能像AMQ那样能够多个BET做为consumer去互斥的(for update悲观锁)并发处理message,这是由于多个BET去消费一个Queue中的数据的时候,因为要保证不能多个线程拿同一条message,因此就须要行级别悲观所(for update),这就致使了consume的性能降低,吞吐量不够。而kafka为了保证吞吐量,只容许同一个consumer group下的一个consumer线程去访问一个partition。若是以为效率不高的时候,能够加partition的数量来横向扩展,那么再加新的consumer thread去消费。若是想多个不一样的业务都须要这个topic的数据,起多个consumer group就行了,你们都是顺序的读取message,offsite的值互不影响。这样没有锁竞争,充分发挥了横向的扩展性,吞吐量极高。这也就造成了分布式消费的概念。
当启动一个consumer group去消费一个topic的时候,不管topic里面有多个少个partition,不管咱们consumer group里面配置了多少个consumer thread,这个consumer group下面的全部consumer thread必定会消费所有的partition;即使这个consumer group下只有一个consumer thread,那么这个consumer thread也会去消费全部的partition。所以,最优的设计就是,consumer group下的consumer thread的数量等于partition数量,这样效率是最高的。
同一partition的一条message只能被同一个Consumer Group内的一个Consumer消费。不可以一个consumer group的多个consumer同时消费一个partition。
一个consumer group下,不管有多少个consumer,这个consumer group必定回去把这个topic下全部的partition都消费了。当consumer group里面的consumer数量小于这个topic下的partition数量的时候,以下图groupA,groupB,就会出现一个conusmer thread消费多个partition的状况,总之是这个topic下的partition都会被消费。若是consumer group里面的consumer数量等于这个topic下的partition数量的时候,以下图groupC,此时效率是最高的,每一个partition都有一个consumer thread去消费。当consumer group里面的consumer数量大于这个topic下的partition数量的时候,以下图GroupD,就会有一个consumer thread空闲。所以,咱们在设定consumer group的时候,只须要指明里面有几个consumer数量便可,无需指定对应的消费partition序号,consumer会自动进行rebalance。
多个Consumer Group下的consumer能够消费同一条message,可是这种消费也是以o(1)的方式顺序的读取message去消费,,因此必定会重复消费这批message的,不能向AMQ那样多个BET做为consumer消费(对message加锁,消费的时候不能重复消费message)
-
Consumer Rebalance的触发条件:(1)Consumer增长或删除会触发 Consumer Group的Rebalance(2)Broker的增长或者减小都会触发 Consumer Rebalance
-
Consumer: Consumer处理partition里面的message的时候是o(1)顺序读取的。因此必须维护着上一次读到哪里的offsite信息。high level API,offset存于Zookeeper中,low level API的offset由本身维护。通常来讲都是使用high level api的。Consumer的delivery gurarantee,默认是读完message先commmit再处理message,autocommit默认是true,这时候先commit就会更新offsite+1,一旦处理失败,offsite已经+1,这个时候就会丢message;也能够配置成读完消息处理再commit,这种状况下consumer端的响应就会比较慢的,须要等处理完才行。
通常状况下,必定是一个consumer group处理一个topic的message。Best Practice是这个consumer group里面consumer的数量等于topic里面partition的数量,这样效率是最高的,一个consumer thread处理一个partition。若是这个consumer group里面consumer的数量小于topic里面partition的数量,就会有consumer thread同时处理多个partition(这个是kafka自动的机制,咱们不用指定),可是总之这个topic里面的全部partition都会被处理到的。。若是这个consumer group里面consumer的数量大于topic里面partition的数量,多出的consumer thread就会闲着啥也不干,剩下的是一个consumer thread处理一个partition,这就形成了资源的浪费,由于一个partition不可能被两个consumer thread去处理。因此咱们线上的分布式多个service服务,每一个service里面的kafka consumer数量都小于对应的topic的partition数量,可是全部服务的consumer数量只和等于partition的数量,这是由于分布式service服务的全部consumer都来自一个consumer group,若是来自不一样的consumer group就会处理重复的message了(同一个consumer group下的consumer不能处理同一个partition,不一样的consumer group能够处理同一个topic,那么都是顺序处理message,必定会处理重复的。通常这种状况都是两个不一样的业务逻辑,才会启动两个consumer group来处理一个topic)。
若是producer的流量增大,当前的topic的parition数量=consumer数量,这时候的应对方式就是很想扩展:增长topic下的partition,同时增长这个consumer group下的consumer。
-
Delivery Mode : Kafka producer 发送message不用维护message的offsite信息,由于这个时候,offsite就至关于一个自增id,producer就尽管发送message就行了。并且Kafka与AMQ不一样,AMQ大都用在处理业务逻辑上,而Kafka大都是日志,因此Kafka的producer通常都是大批量的batch发送message,向这个topic一次性发送一大批message,load balance到一个partition上,一块儿插进去,offsite做为自增id本身增长就好。可是Consumer端是须要维护这个partition当前消费到哪一个message的offsite信息的,这个offsite信息,high level api是维护在Zookeeper上,low level api是本身的程序维护。(Kafka管理界面上只能显示high level api的consumer部分,由于low level api的partition offsite信息是程序本身维护,kafka是不知道的,没法在管理界面上展现 )当使用high level api的时候,先拿message处理,再定时自动commit offsite+1(也能够改为手动), 而且kakfa处理message是没有锁操做的。所以若是处理message失败,此时尚未commit offsite+1,当consumer thread重启后会重复消费这个message。可是做为高吞吐量高并发的实时处理系统,at least once的状况下,至少一次会被处理到,是能够容忍的。若是没法容忍,就得使用low level api来本身程序维护这个offsite信息,那么想何时commit offsite+1就本身搞定了。
-
Topic & Partition:Topic至关于传统消息系统MQ中的一个队列queue,producer端发送的message必须指定是发送到哪一个topic,可是不须要指定topic下的哪一个partition,由于kafka会把收到的message进行load balance,均匀的分布在这个topic下的不一样的partition上( hash(message) % [broker数量] )。物理上存储上,这个topic会分红一个或多个partition,每一个partiton至关因而一个子queue。在物理结构上,每一个partition对应一个物理的目录(文件夹),文件夹命名是[topicname]_[partition]_[序号],一个topic能够有无数多的partition,根据业务需求和数据量来设置。在kafka配置文件中可随时更高num.partitions参数来配置更改topic的partition数量,在建立Topic时经过参数指定parittion数量。Topic建立以后经过Kafka提供的工具也能够修改partiton数量。
通常来讲,(1)一个Topic的Partition数量大于等于Broker的数量,能够提升吞吐率。(2)同一个Partition的Replica尽可能分散到不一样的机器,高可用。
当add a new partition的时候,partition里面的message不会从新进行分配,原来的partition里面的message数据不会变,新加的这个partition刚开始是空的,随后进入这个topic的message就会从新参与全部partition的load balance
-
Partition Replica:每一个partition能够在其余的kafka broker节点上存副本,以便某个kafka broker节点宕机不会影响这个kafka集群。存replica副本的方式是按照kafka broker的顺序存。例若有5个kafka broker节点,某个topic有3个partition,每一个partition存2个副本,那么partition1存broker1,broker2,partition2存broker2,broker3。。。以此类推
(replica副本数目不能大于kafka broker节点的数目,不然报错。这里的replica数其实就是partition的副本总数,其中包括一个leader,其余的就是copy副本)。这样若是某个broker宕机,其实整个kafka内数据依然是完整的。可是,replica副本数越高,系统虽然越稳定,可是回来带资源和性能上的降低;replica副本少的话,也会形成系统丢数据的风险。
(1)怎样传送消息:producer先把message发送到partition leader,再由leader发送给其余partition follower。(若是让producer发送给每一个replica那就太慢了)
(2)在向Producer发送ACK前须要保证有多少个Replica已经收到该消息:根据ack配的个数而定
(3)怎样处理某个Replica不工做的状况:若是这个部工做的partition replica不在ack列表中,就是producer在发送消息到partition leader上,partition leader向partition follower发送message没有响应而已,这个不会影响整个系统,也不会有什么问题。若是这个不工做的partition replica在ack列表中的话,producer发送的message的时候会等待这个不工做的partition replca写message成功,可是会等到time out,而后返回失败由于某个ack列表中的partition replica没有响应,此时kafka会自动的把这个部工做的partition replica从ack列表中移除,之后的producer发送message的时候就不会有这个ack列表下的这个部工做的partition replica了。
(4)怎样处理Failed Replica恢复回来的状况:若是这个partition replica以前不在ack列表中,那么启动后从新受Zookeeper管理便可,以后producer发送message的时候,partition leader会继续发送message到这个partition follower上。若是这个partition replica以前在ack列表中,此时重启后,须要把这个partition replica再手动加到ack列表中。(ack列表是手动添加的,出现某个部工做的partition replica的时候自动从ack列表中移除的)
-
Partition leader与follower:partition也有leader和follower之分。leader是主partition,producer写kafka的时候先写partition leader,再由partition leader push给其余的partition follower。partition leader与follower的信息受Zookeeper控制,一旦partition leader所在的broker节点宕机,zookeeper会冲其余的broker的partition follower上选择follower变为parition leader。
-
Topic分配partition和partition replica的算法:(1)将Broker(size=n)和待分配的Partition排序。(2)将第i个Partition分配到第(i%n)个Broker上。(3)将第i个Partition的第j个Replica分配到第((i + j) % n)个Broker上
- 消息投递可靠性
一个消息如何算投递成功,Kafka提供了三种模式:
- 第一种是啥都无论,发送出去就看成成功,这种状况固然不能保证消息成功投递到broker;
- 第二种是Master-Slave模型,只有当Master和全部Slave都接收到消息时,才算投递成功,这种模型提供了最高的投递可靠性,可是损伤了性能;
- 第三种模型,即只要Master确认收到消息就算投递成功;实际使用时,根据应用特性选择,绝大多数状况下都会中和可靠性和性能选择第三种模型
消息在broker上的可靠性,由于消息会持久化到磁盘上,因此若是正常stop一个broker,其上的数据不会丢失;可是若是不正常stop,可能会使存在页面缓存来不及写入磁盘的消息丢失,这能够经过配置flush页面缓存的周期、阈值缓解,可是一样会频繁的写磁盘会影响性能,又是一个选择题,根据实际状况配置。
消息消费的可靠性,Kafka提供的是“At least once”模型,由于消息的读取进度由offset提供,offset能够由消费者本身维护也能够维护在zookeeper里,可是当消息消费后consumer挂掉,offset没有即时写回,就有可能发生重复读的状况,这种状况一样能够经过调整commit offset周期、阈值缓解,甚至消费者本身把消费和commit offset作成一个事务解决,可是若是你的应用不在意重复消费,那就干脆不要解决,以换取最大的性能。
-
Partition ack:当ack=1,表示producer写partition leader成功后,broker就返回成功,不管其余的partition follower是否写成功。当ack=2,表示producer写partition leader和其余一个follower成功的时候,broker就返回成功,不管其余的partition follower是否写成功。当ack=-1[parition的数量]的时候,表示只有producer所有写成功的时候,才算成功,kafka broker才返回成功信息。
这里须要注意的是,若是ack=1的时候,一旦有个broker宕机致使partition的follower和leader切换,会致使丢数据。
-
message状态:在Kafka中,消息的状态被保存在consumer中,broker不会关心哪一个消息被消费了被谁消费了,只记录一个offset值(指向partition中下一个要被消费的消息位置),这就意味着若是consumer处理很差的话,broker上的一个消息可能会被消费屡次。
-
message持久化:Kafka中会把消息持久化到本地文件系统中,而且保持o(1)极高的效率。咱们众所周知IO读取是很是耗资源的性能也是最慢的,这就是为了数据库的瓶颈常常在IO上,须要换SSD硬盘的缘由。可是Kafka做为吞吐量极高的MQ,却能够很是高效的message持久化到文件。这是由于Kafka是顺序写入o(1)的时间复杂度,速度很是快。也是高吞吐量的缘由。因为message的写入持久化是顺序写入的,所以message在被消费的时候也是按顺序被消费的,保证partition的message是顺序消费的。通常的机器,单机每秒100k条数据。
-
message有效期:Kafka会长久保留其中的消息,以便consumer能够屡次消费,固然其中不少细节是可配置的。
-
Produer : Producer向Topic发送message,不须要指定partition,直接发送就行了。kafka经过partition ack来控制是否发送成功并把信息返回给producer,producer能够有任意多的thread,这些kafka服务器端是不care的。Producer端的delivery guarantee默认是At least once的。也能够设置Producer异步发送实现At most once。Producer能够用主键幂等性实现Exactly once
-
Kafka高吞吐量: Kafka的高吞吐量体如今读写上,分布式并发的读和写都很是快,写的性能体如今以o(1)的时间复杂度进行顺序写入。读的性能体如今以o(1)的时间复杂度进行顺序读取, 对topic进行partition分区,consume group中的consume线程能够以很高能性能进行顺序读。
- Kafka delivery guarantee(message传送保证):(1)At most once消息可能会丢,绝对不会重复传输;(2)At least once 消息绝对不会丢,可是可能会重复传输;(3)Exactly once每条信息确定会被传输一次且仅传输一次,这是用户想要的。
-
批量发送:Kafka支持以消息集合为单位进行批量发送,以提升push效率。
-
push-and-pull : Kafka中的Producer和consumer采用的是push-and-pull模式,即Producer只管向broker push消息,consumer只管从broker pull消息,二者对消息的生产和消费是异步的。
-
Kafka集群中broker之间的关系:不是主从关系,各个broker在集群中地位同样,咱们能够随意的增长或删除任何一个broker节点。
-
负载均衡方面: Kafka提供了一个 metadata API来管理broker之间的负载(对Kafka0.8.x而言,对于0.7.x主要靠zookeeper来实现负载均衡)。
-
同步异步:Producer采用异步push方式,极大提升Kafka系统的吞吐率(能够经过参数控制是采用同步仍是异步方式)。
-
分区机制partition:Kafka的broker端支持消息分区partition,Producer能够决定把消息发到哪一个partition,在一个partition 中message的顺序就是Producer发送消息的顺序,一个topic中能够有多个partition,具体partition的数量是可配置的。partition的概念使得kafka做为MQ能够横向扩展,吞吐量巨大。partition能够设置replica副本,replica副本存在不一样的kafka broker节点上,第一个partition是leader,其余的是follower,message先写到partition leader上,再由partition leader push到parition follower上。因此说kafka能够水平扩展,也就是扩展partition。
-
离线数据装载:Kafka因为对可拓展的数据持久化的支持,它也很是适合向Hadoop或者数据仓库中进行数据装载。
-
实时数据与离线数据:kafka既支持离线数据也支持实时数据,由于kafka的message持久化到文件,并能够设置有效期,所以能够把kafka做为一个高效的存储来使用,能够做为离线数据供后面的分析。固然做为分布式实时消息系统,大多数状况下仍是用于实时的数据处理的,可是当cosumer消费能力降低的时候能够经过message的持久化在淤积数据在kafka。
-
插件支持:如今很多活跃的社区已经开发出很多插件来拓展Kafka的功能,如用来配合Storm、Hadoop、flume相关的插件。
-
解耦: 至关于一个MQ,使得Producer和Consumer之间异步的操做,系统之间解耦
-
冗余: replica有多个副本,保证一个broker node宕机后不会影响整个服务
-
扩展性: broker节点能够水平扩展,partition也能够水平增长,partition replica也能够水平增长
-
峰值: 在访问量剧增的状况下,kafka水平扩展, 应用仍然须要继续发挥做用
-
可恢复性: 系统的一部分组件失效时,因为有partition的replica副本,不会影响到整个系统。
-
顺序保证性:因为kafka的producer的写message与consumer去读message都是顺序的读写,保证了高效的性能。
-
缓冲:因为producer那面可能业务很简单,然后端consumer业务会很复杂并有数据库的操做,所以确定是producer会比consumer处理速度快,若是没有kafka,producer直接调用consumer,那么就会形成整个系统的处理速度慢,加一层kafka做为MQ,能够起到缓冲的做用。
-
异步通讯:做为MQ,Producer与Consumer异步通讯
1.持久化
kafka使用文件存储消息(append only log),这就直接决定kafka在性能上严重依赖文件系统的自己特性.且不管任何OS下,对文件系统自己的优化是很是艰难的.文件缓存/直接内存映射等是经常使用的手段.由于kafka是对日志文件进行append操做,所以磁盘检索的开支是较小的;同时为了减小磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数(或尺寸)达到必定阀值时,再flush到磁盘,这样减小了磁盘IO调用的次数.对于kafka而言,较高性能的磁盘,将会带来更加直接的性能提高.
2.性能
除磁盘IO以外,咱们还须要考虑网络IO,这直接关系到kafka的吞吐量问题.kafka并无提供太多高超的技巧;对于producer端,能够将消息buffer起来,当消息的条数达到必定阀值时,批量发送给broker;对于consumer端也是同样,批量fetch多条消息.不过消息量的大小能够经过配置文件来指定.对于kafka broker端,彷佛有个sendfile系统调用能够潜在的提高网络IO的性能:将文件的数据映射到系统内存中,socket直接读取相应的内存区域便可,而无需进程再次copy和交换(这里涉及到"磁盘IO数据"/"内核内存"/"进程内存"/"网络缓冲区",多者之间的数据copy).
其实对于producer/consumer/broker三者而言,CPU的开支应该都不大,所以启用消息压缩机制是一个良好的策略;压缩须要消耗少许的CPU资源,不过对于kafka而言,网络IO更应该须要考虑.能够将任何在网络上传输的消息都通过压缩.kafka支持gzip/snappy等多种压缩方式.
3.负载均衡
kafka集群中的任何一个broker,均可以向producer提供metadata信息,这些metadata中包含"集群中存活的servers列表"/"partitions leader列表"等信息(请参看zookeeper中的节点信息). 当producer获取到metadata信息以后, producer将会和Topic下全部partition leader保持socket链接;消息由producer直接经过socket发送到broker,中间不会通过任何"路由层".
异步发送,将多条消息暂且在客户端buffer起来,并将他们批量发送到broker;小数据IO太多,会拖慢总体的网络延迟,批量延迟发送事实上提高了网络效率;不过这也有必定的隐患,好比当producer失效时,那些还没有发送的消息将会丢失。
4.Topic模型
其余JMS实现,消息消费的位置是有prodiver保留,以便避免重复发送消息或者将没有消费成功的消息重发等,同时还要控制消息的状态.这就要求JMS broker须要太多额外的工做.在kafka中,partition中的消息只有一个consumer在消费,且不存在消息状态的控制,也没有复杂的消息确认机制,可见kafka broker端是至关轻量级的.当消息被consumer接收以后,consumer能够在本地保存最后消息的offset,并间歇性的向zookeeper注册offset.因而可知,consumer客户端也很轻量级。
kafka中consumer负责维护消息的消费记录,而broker则不关心这些,这种设计不只提升了consumer端的灵活性,也适度的减轻了broker端设计的复杂度;这是和众多JMS prodiver的区别.此外,kafka中消息ACK的设计也和JMS有很大不一样,kafka中的消息是批量(一般以消息的条数或者chunk的尺寸为单位)发送给consumer,当消息消费成功后,向zookeeper提交消息的offset,而不会向broker交付ACK.或许你已经意识到,这种"宽松"的设计,将会有"丢失"消息/"消息重发"的危险.
5.消息传输一致
Kafka提供3种消息传输一致性语义:最多1次,最少1次,刚好1次。
最少1次:可能会重传数据,有可能出现数据被重复处理的状况;
最多1次:可能会出现数据丢失状况;
刚好1次:并非指真正只传输1次,只不过有一个机制。确保不会出现“数据被重复处理”和“数据丢失”的状况。
at most once: 消费者fetch消息,而后保存offset,而后处理消息;当client保存offset以后,可是在消息处理过程当中consumer进程失效(crash),致使部分消息未能继续处理.那么此后可能其余consumer会接管,可是由于offset已经提早保存,那么新的consumer将不能fetch到offset以前的消息(尽管它们尚没有被处理),这就是"at most once".
at least once: 消费者fetch消息,而后处理消息,而后保存offset.若是消息处理成功以后,可是在保存offset阶段zookeeper异常或者consumer失效,致使保存offset操做未能执行成功,这就致使接下来再次fetch时可能得到上次已经处理过的消息,这就是"at least once".
"Kafka Cluster"到消费者的场景中能够采起如下方案来获得“刚好1次”的一致性语义:
最少1次+消费者的输出中额外增长已处理消息最大编号:因为已处理消息最大编号的存在,不会出现重复处理消息的状况。
6.副本
kafka中,replication策略是基于partition,而不是topic;kafka将每一个partition数据复制到多个server上,任何一个partition有一个leader和多个follower(能够没有);备份的个数能够经过broker配置文件来设定。leader处理全部的read-write请求,follower须要和leader保持同步.Follower就像一个"consumer",消费消息并保存在本地日志中;leader负责跟踪全部的follower状态,若是follower"落后"太多或者失效,leader将会把它从replicas同步列表中删除.当全部的follower都将一条消息保存成功,此消息才被认为是"committed",那么此时consumer才能消费它,这种同步策略,就要求follower和leader之间必须具备良好的网络环境.即便只有一个replicas实例存活,仍然能够保证消息的正常发送和接收,只要zookeeper集群存活便可.
选择follower时须要兼顾一个问题,就是新leader server上所已经承载的partition leader的个数,若是一个server上有过多的partition leader,意味着此server将承受着更多的IO压力.在选举新leader,须要考虑到"负载均衡",partition leader较少的broker将会更有可能成为新的leader.
7.log
每一个log entry格式为"4个字节的数字N表示消息的长度" + "N个字节的消息内容";每一个日志都有一个offset来惟一的标记一条消息,offset的值为8个字节的数字,表示此消息在此partition中所处的起始位置..每一个partition在物理存储层面,有多个log file组成(称为segment).segment file的命名为"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始消息的offset.
获取消息时,须要指定offset和最大chunk尺寸,offset用来表示消息的起始位置,chunk size用来表示最大获取消息的总长度(间接的表示消息的条数).根据offset,能够找到此消息所在segment文件,而后根据segment的最小offset取差值,获得它在file中的相对位置,直接读取输出便可.
8.分布式
kafka使用zookeeper来存储一些meta信息,并使用了zookeeper watch机制来发现meta信息的变动并做出相应的动做(好比consumer失效,触发负载均衡等)
Broker node registry: 当一个kafka broker启动后,首先会向zookeeper注册本身的节点信息(临时znode),同时当broker和zookeeper断开链接时,此znode也会被删除.
Broker Topic Registry: 当一个broker启动时,会向zookeeper注册本身持有的topic和partitions信息,仍然是一个临时znode.
Consumer and Consumer group: 每一个consumer客户端被建立时,会向zookeeper注册本身的信息;此做用主要是为了"负载均衡".一个group中的多个consumer能够交错的消费一个topic的全部partitions;简而言之,保证此topic的全部partitions都能被此group所消费,且消费时为了性能考虑,让partition相对均衡的分散到每一个consumer上.
Consumer id Registry: 每一个consumer都有一个惟一的ID(host:uuid,能够经过配置文件指定,也能够由系统生成),此id用来标记消费者信息.
Consumer offset Tracking: 用来跟踪每一个consumer目前所消费的partition中最大的offset.此znode为持久节点,能够看出offset跟group_id有关,以代表当group中一个消费者失效,其余consumer能够继续消费.
Partition Owner registry: 用来标记partition正在被哪一个consumer消费.临时znode。此节点表达了"一个partition"只能被group下一个consumer消费,同时当group下某个consumer失效,那么将会触发负载均衡(即:让partitions在多个consumer间均衡消费,接管那些"游离"的partitions)
当consumer启动时,所触发的操做:
A) 首先进行"Consumer id Registry";
B) 而后在"Consumer id Registry"节点下注册一个watch用来监听当前group中其余consumer的"leave"和"join";只要此znode path下节点列表变动,都会触发此group下consumer的负载均衡.(好比一个consumer失效,那么其余consumer接管partitions).
C) 在"Broker id registry"节点下,注册一个watch用来监听broker的存活状况;若是broker列表变动,将会触发全部的groups下的consumer从新balance.
总结:
1) Producer端使用zookeeper用来"发现"broker列表,以及和Topic下每一个partition leader创建socket链接并发送消息.
2) Broker端使用zookeeper用来注册broker信息,已经监测partition leader存活性.
3) Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader创建socket链接,并获取消息。
9.Leader的选择
Kafka的核心是日志文件,日志文件在集群中的同步是分布式数据系统最基础的要素。
若是leaders永远不会down的话咱们就不须要followers了!一旦leader down掉了,须要在followers中选择一个新的leader.可是followers自己有可能延时过久或者crash,因此必须选择高质量的follower做为leader.必须保证,一旦一个消息被提交了,可是leader down掉了,新选出的leader必须能够提供这条消息。大部分的分布式系统采用了多数投票法则选择新的leader,对于多数投票法则,就是根据全部副本节点的情况动态的选择最适合的做为leader.Kafka并非使用这种方法。
Kafka动态维护了一个同步状态的副本的集合(a set of in-sync replicas),简称ISR,在这个集合中的节点都是和leader保持高度一致的,任何一条消息必须被这个集合中的每一个节点读取并追加到日志中了,才回通知外部这个消息已经被提交了。所以这个集合中的任何一个节点随时均可以被选为leader.ISR在ZooKeeper中维护。ISR中有f+1个节点,就能够容许在f个节点down掉的状况下不会丢失消息并正常提供服。ISR的成员是动态的,若是一个节点被淘汰了,当它从新达到“同步中”的状态时,他能够从新加入ISR.这种leader的选择方式是很是快速的,适合kafka的应用场景。
一个邪恶的想法:若是全部节点都down掉了怎么办?Kafka对于数据不会丢失的保证,是基于至少一个节点是存活的,一旦全部节点都down了,这个就不能保证了。
实际应用中,当全部的副本都down掉时,必须及时做出反应。能够有如下两种选择:
1. 等待ISR中的任何一个节点恢复并担任leader。
2. 选择全部节点中(不仅是ISR)第一个恢复的节点做为leader.
这是一个在可用性和连续性之间的权衡。若是等待ISR中的节点恢复,一旦ISR中的节点起不起来或者数据都是了,那集群就永远恢复不了了。若是等待ISR意外的节点恢复,这个节点的数据就会被做为线上数据,有可能和真实的数据有所出入,由于有些数据它可能还没同步到。Kafka目前选择了第二种策略,在将来的版本中将使这个策略的选择可配置,能够根据场景灵活的选择。
这种窘境不仅Kafka会遇到,几乎全部的分布式数据系统都会遇到。
10.副本管理
以上仅仅以一个topic一个分区为例子进行了讨论,但实际上一个Kafka将会管理成千上万的topic分区.Kafka尽可能的使全部分区均匀的分布到集群全部的节点上而不是集中在某些节点上,另外主从关系也尽可能均衡这样每一个几点都会担任必定比例的分区的leader.
优化leader的选择过程也是很重要的,它决定了系统发生故障时的空窗期有多久。Kafka选择一个节点做为“controller”,当发现有节点down掉的时候它负责在游泳分区的全部节点中选择新的leader,这使得Kafka能够批量的高效的管理全部分区节点的主从关系。若是controller down掉了,活着的节点中的一个会备切换为新的controller.
11.Leader与副本同步
对于某个分区来讲,保存正分区的"broker"为该分区的"leader",保存备份分区的"broker"为该分区的"follower"。备份分区会彻底复制正分区的消息,包括消息的编号等附加属性值。为了保持正分区和备份分区的内容一致,Kafka采起的方案是在保存备份分区的"broker"上开启一个消费者进程进行消费,从而使得正分区的内容与备份分区的内容保持一致。通常状况下,一个分区有一个“正分区”和零到多个“备份分区”。能够配置“正分区+备份分区”的总数量,关于这个配置,不一样主题能够有不一样的配置值。注意,生产者,消费者只与保存正分区的"leader"进行通讯。
Kafka容许topic的分区拥有若干副本,这个数量是能够配置的,你能够为每一个topic配置副本的数量。Kafka会自动在每一个副本上备份数据,因此当一个节点down掉时数据依然是可用的。
Kafka的副本功能不是必须的,你能够配置只有一个副本,这样其实就至关于只有一份数据。
建立副本的单位是topic的分区,每一个分区都有一个leader和零或多个followers.全部的读写操做都由leader处理,通常分区的数量都比broker的数量多的多,各分区的leader均匀的分布在brokers中。全部的followers都复制leader的日志,日志中的消息和顺序都和leader中的一致。followers向普通的consumer那样从leader那里拉取消息并保存在本身的日志文件中。
许多分布式的消息系统自动的处理失败的请求,它们对一个节点是否着(alive)”有着清晰的定义。Kafka判断一个节点是否活着有两个条件:
1. 节点必须能够维护和ZooKeeper的链接,Zookeeper经过心跳机制检查每一个节点的链接。
2. 若是节点是个follower,他必须能及时的同步leader的写操做,延时不能过久。
符合以上条件的节点准确的说应该是“同步中的(in sync)”,而不是模糊的说是“活着的”或是“失败的”。Leader会追踪全部“同步中”的节点,一旦一个down掉了,或是卡住了,或是延时过久,leader就会把它移除。至于延时多久算是“过久”,是由参数replica.lag.max.messages决定的,怎样算是卡住了,怎是由参数replica.lag.time.max.ms决定的。
只有当消息被全部的副本加入到日志中时,才算是“committed”,只有committed的消息才会发送给consumer,这样就不用担忧一旦leader down掉了消息会丢失。Producer也能够选择是否等待消息被提交的通知,这个是由参数acks决定的。
Kafka保证只要有一个“同步中”的节点,“committed”的消息就不会丢失。