Kafka详细原理

Kafka
Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是能够实时的处理大量数据以知足各类需求场景:好比基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源 项目。
 
1.前言
消息队列的性能好坏,其文件存储机制设计是衡量一个消息队列服务技术水平和最关键指标之一。下面将从Kafka文件存储机制和物理结构角度,分析Kafka是如何实现高效文件存储,及实际应用效果。
 
 1.1  Kafka的特性:
- 高吞吐量、低延迟:kafka每秒能够处理几十万条消息,它的延迟最低只有几毫秒,每一个topic能够分多个partition, consumer group 对partition进行consume操做。
- 可扩展性:kafka集群支持热扩展
- 持久性、可靠性:消息被持久化到本地磁盘,而且支持数据备份防止数据丢失
- 容错性:容许集群中节点失败(若副本数量为n,则容许n-1个节点失败)
- 高并发:支持数千个客户端同时读写
 
1.2   Kafka的使用场景:
- 日志收集:一个公司能够用Kafka能够收集各类服务的log,经过kafka以统一接口服务的方式开放给各类consumer,例如hadoop、Hbase、Solr等。
- 消息系统:解耦和生产者和消费者、缓存消息等。
- 用户活动跟踪:Kafka常常被用来记录web用户或者app用户的各类活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到kafka的topic中,而后订阅者经过订阅这些topic来作实时的监控分析,或者装载到hadoop、数据仓库中作离线分析和挖掘。
- 运营指标:Kafka也常常用来记录运营监控数据。包括收集各类分布式应用的数据,生产各类操做的集中反馈,好比报警和报告。
- 流式处理:好比spark streaming和storm
- 事件源
 
1.3  Kakfa的设计思想
- Kakfa Broker Leader的选举:Kakfa Broker集群受Zookeeper管理。全部的Kafka Broker节点一块儿去Zookeeper上注册一个临时节点,由于只有一个Kafka Broker会注册成功,其余的都会失败,因此这个成功在Zookeeper上注册临时节点的这个Kafka Broker会成为Kafka Broker Controller,其余的Kafka broker叫Kafka Broker follower。(这个过程叫Controller在ZooKeeper注册Watch)。这个Controller会监听其余的Kafka Broker的全部信息,若是这个kafka broker controller宕机了,在zookeeper上面的那个临时节点就会消失,此时全部的kafka broker又会一块儿去Zookeeper上注册一个临时节点,由于只有一个Kafka Broker会注册成功,其余的都会失败,因此这个成功在Zookeeper上注册临时节点的这个Kafka Broker会成为Kafka Broker Controller,其余的Kafka broker叫Kafka Broker follower。例如:一旦有一个broker宕机了,这个kafka broker controller会读取该宕机broker上全部的partition在zookeeper上的状态,并选取ISR列表中的一个replica做为partition leader(若是ISR列表中的replica全挂,选一个幸存的replica做为leader; 若是该partition的全部的replica都宕机了,则将新的leader设置为-1,等待恢复,等待ISR中的任一个Replica“活”过来,而且选它做为Leader;或选择第一个“活”过来的Replica(不必定是ISR中的)做为Leader),这个broker宕机的事情,kafka controller也会通知zookeeper,zookeeper就会通知其余的kafka broker。
这里曾经发生过一个bug,TalkingData使用Kafka0.8.1的时候,kafka controller在Zookeeper上注册成功后,它和Zookeeper通讯的timeout时间是6s,也就是若是kafka controller若是有6s中没有和Zookeeper作心跳,那么Zookeeper就认为这个kafka controller已经死了,就会在Zookeeper上把这个临时节点删掉,那么其余Kafka就会认为controller已经没了,就会再次抢着注册临时节点,注册成功的那个kafka broker成为controller,而后,以前的那个kafka controller就须要各类shut down去关闭各类节点和事件的监听。可是当kafka的读写流量都很是巨大的时候,TalkingData的一个bug是,因为网络等缘由,kafka controller和Zookeeper有6s中没有通讯,因而从新选举出了一个新的kafka controller,可是原来的controller在shut down的时候老是不成功,这个时候producer进来的message因为Kafka集群中存在两个kafka controller而没法落地。致使数据淤积。
这里曾经还有一个bug,TalkingData使用Kafka0.8.1的时候,当ack=0的时候,表示producer发送出去message,只要对应的kafka broker topic partition leader接收到的这条message,producer就返回成功,无论partition leader 是否真的成功把message真正存到kafka。当ack=1的时候,表示producer发送出去message,同步的把message存到对应topic的partition的leader上,而后producer就返回成功,partition leader异步的把message同步到其余partition replica上。当ack=all或-1,表示producer发送出去message,同步的把message存到对应topic的partition的leader和对应的replica上以后,才返回成功。可是若是某个kafka controller 切换的时候,会致使partition leader的切换(老的 kafka controller上面的partition leader会选举到其余的kafka broker上),可是这样就会致使丢数据。
-  Consumergroup:各个consumer(consumer 线程)能够组成一个组(Consumer group ),partition中的每一个message只能被组(Consumer group )中的一个consumer(consumer 线程)消费,若是一个message能够被多个consumer(consumer 线程)消费的话,那么这些consumer必须在不一样的组。Kafka不支持一个partition中的message由两个或两个以上的同一个consumer group下的consumer thread来处理,除非再启动一个新的consumer group。因此若是想同时对一个topic作消费的话,启动多个consumer group就能够了,可是要注意的是,这里的多个consumer的消费都必须是顺序读取partition里面的message,新启动的consumer默认从partition队列最头端最新的地方开始阻塞的读message。它不能像AMQ那样能够多个BET做为consumer去互斥的(for update悲观锁)并发处理message,这是由于多个BET去消费一个Queue中的数据的时候,因为要保证不能多个线程拿同一条message,因此就须要行级别悲观所(for update),这就致使了consume的性能降低,吞吐量不够。而kafka为了保证吞吐量,只容许同一个consumer group下的一个consumer线程去访问一个partition。若是以为效率不高的时候,能够加partition的数量来横向扩展,那么再加新的consumer thread去消费。若是想多个不一样的业务都须要这个topic的数据,起多个consumer group就行了,你们都是顺序的读取message,offsite的值互不影响。这样没有锁竞争,充分发挥了横向的扩展性,吞吐量极高。这也就造成了分布式消费的概念。
    当启动一个consumer group去消费一个topic的时候,不管topic里面有多个少个partition,不管咱们consumer group里面配置了多少个consumer thread,这个consumer group下面的全部consumer thread必定会消费所有的partition;即使这个consumer group下只有一个consumer thread,那么这个consumer thread也会去消费全部的partition。所以,最优的设计就是,consumer group下的consumer thread的数量等于partition数量,这样效率是最高的。
    同一partition的一条message只能被同一个Consumer Group内的一个Consumer消费。不可以一个consumer group的多个consumer同时消费一个partition。
    一个consumer group下,不管有多少个consumer,这个consumer group必定回去把这个topic下全部的partition都消费了。当consumer group里面的consumer数量小于这个topic下的partition数量的时候,以下图groupA,groupB,就会出现一个conusmer thread消费多个partition的状况,总之是这个topic下的partition都会被消费。若是consumer group里面的consumer数量等于这个topic下的partition数量的时候,以下图groupC,此时效率是最高的,每一个partition都有一个consumer thread去消费。当consumer group里面的consumer数量大于这个topic下的partition数量的时候,以下图GroupD,就会有一个consumer thread空闲。所以,咱们在设定consumer group的时候,只须要指明里面有几个consumer数量便可,无需指定对应的消费partition序号,consumer会自动进行rebalance。
    多个Consumer Group下的consumer能够消费同一条message,可是这种消费也是以o(1)的方式顺序的读取message去消费,,因此必定会重复消费这批message的,不能向AMQ那样多个BET做为consumer消费(对message加锁,消费的时候不能重复消费message)
- Consumer Rebalance的触发条件:(1)Consumer增长或删除会触发 Consumer Group的Rebalance(2)Broker的增长或者减小都会触发 Consumer Rebalance
- Consumer: Consumer处理partition里面的message的时候是o(1)顺序读取的。因此必须维护着上一次读到哪里的offsite信息。high level API,offset存于Zookeeper中,low level API的offset由本身维护。通常来讲都是使用high level api的。Consumer的delivery gurarantee,默认是读完message先commmit再处理message,autocommit默认是true,这时候先commit就会更新offsite+1,一旦处理失败,offsite已经+1,这个时候就会丢message;也能够配置成读完消息处理再commit,这种状况下consumer端的响应就会比较慢的,须要等处理完才行。
通常状况下,必定是一个consumer group处理一个topic的message。Best Practice是这个consumer group里面consumer的数量等于topic里面partition的数量,这样效率是最高的,一个consumer thread处理一个partition。若是这个consumer group里面consumer的数量小于topic里面partition的数量,就会有consumer thread同时处理多个partition(这个是kafka自动的机制,咱们不用指定),可是总之这个topic里面的全部partition都会被处理到的。。若是这个consumer group里面consumer的数量大于topic里面partition的数量,多出的consumer thread就会闲着啥也不干,剩下的是一个consumer thread处理一个partition,这就形成了资源的浪费,由于一个partition不可能被两个consumer thread去处理。因此咱们线上的分布式多个service服务,每一个service里面的kafka consumer数量都小于对应的topic的partition数量,可是全部服务的consumer数量只和等于partition的数量,这是由于分布式service服务的全部consumer都来自一个consumer group,若是来自不一样的consumer group就会处理重复的message了(同一个consumer group下的consumer不能处理同一个partition,不一样的consumer group能够处理同一个topic,那么都是顺序处理message,必定会处理重复的。通常这种状况都是两个不一样的业务逻辑,才会启动两个consumer group来处理一个topic)。
 
若是producer的流量增大,当前的topic的parition数量=consumer数量,这时候的应对方式就是很想扩展:增长topic下的partition,同时增长这个consumer group下的consumer。
                 
- Delivery Mode : Kafka producer 发送message不用维护message的offsite信息,由于这个时候,offsite就至关于一个自增id,producer就尽管发送message就行了。并且Kafka与AMQ不一样,AMQ大都用在处理业务逻辑上,而Kafka大都是日志,因此Kafka的producer通常都是大批量的batch发送message,向这个topic一次性发送一大批message,load balance到一个partition上,一块儿插进去,offsite做为自增id本身增长就好。可是Consumer端是须要维护这个partition当前消费到哪一个message的offsite信息的,这个offsite信息,high level api是维护在Zookeeper上,low level api是本身的程序维护。(Kafka管理界面上只能显示high level api的consumer部分,由于low level api的partition offsite信息是程序本身维护,kafka是不知道的,没法在管理界面上展现 )当使用high level api的时候,先拿message处理,再定时自动commit offsite+1(也能够改为手动), 而且kakfa处理message是没有锁操做的。所以若是处理message失败,此时尚未commit offsite+1,当consumer thread重启后会重复消费这个message。可是做为高吞吐量高并发的实时处理系统,at least once的状况下,至少一次会被处理到,是能够容忍的。若是没法容忍,就得使用low level api来本身程序维护这个offsite信息,那么想何时commit offsite+1就本身搞定了。
 
- Topic & Partition:Topic至关于传统消息系统MQ中的一个队列queue,producer端发送的message必须指定是发送到哪一个topic,可是不须要指定topic下的哪一个partition,由于kafka会把收到的message进行load balance,均匀的分布在这个topic下的不一样的partition上( hash(message) % [broker数量]  )。物理上存储上,这个topic会分红一个或多个partition,每一个partiton至关因而一个子queue。在物理结构上,每一个partition对应一个物理的目录(文件夹),文件夹命名是[topicname]_[partition]_[序号],一个topic能够有无数多的partition,根据业务需求和数据量来设置。在kafka配置文件中可随时更高num.partitions参数来配置更改topic的partition数量,在建立Topic时经过参数指定parittion数量。Topic建立以后经过Kafka提供的工具也能够修改partiton数量。
   通常来讲,(1)一个Topic的Partition数量大于等于Broker的数量,能够提升吞吐率。(2)同一个Partition的Replica尽可能分散到不一样的机器,高可用。
  当add a new partition的时候,partition里面的message不会从新进行分配,原来的partition里面的message数据不会变,新加的这个partition刚开始是空的,随后进入这个topic的message就会从新参与全部partition的load balance
- Partition Replica:每一个partition能够在其余的kafka broker节点上存副本,以便某个kafka broker节点宕机不会影响这个kafka集群。存replica副本的方式是按照kafka broker的顺序存。例若有5个kafka broker节点,某个topic有3个partition,每一个partition存2个副本,那么partition1存broker1,broker2,partition2存broker2,broker3。。。以此类推(replica副本数目不能大于kafka broker节点的数目,不然报错。这里的replica数其实就是partition的副本总数,其中包括一个leader,其余的就是copy副本)。这样若是某个broker宕机,其实整个kafka内数据依然是完整的。可是,replica副本数越高,系统虽然越稳定,可是回来带资源和性能上的降低;replica副本少的话,也会形成系统丢数据的风险。
  (1)怎样传送消息:producer先把message发送到partition leader,再由leader发送给其余partition follower。(若是让producer发送给每一个replica那就太慢了)
  (2)在向Producer发送ACK前须要保证有多少个Replica已经收到该消息:根据ack配的个数而定
  (3)怎样处理某个Replica不工做的状况:若是这个部工做的partition replica不在ack列表中,就是producer在发送消息到partition leader上,partition leader向partition follower发送message没有响应而已,这个不会影响整个系统,也不会有什么问题。若是这个不工做的partition replica在ack列表中的话,producer发送的message的时候会等待这个不工做的partition replca写message成功,可是会等到time out,而后返回失败由于某个ack列表中的partition replica没有响应,此时kafka会自动的把这个部工做的partition replica从ack列表中移除,之后的producer发送message的时候就不会有这个ack列表下的这个部工做的partition replica了。 
  (4)怎样处理Failed Replica恢复回来的状况:若是这个partition replica以前不在ack列表中,那么启动后从新受Zookeeper管理便可,以后producer发送message的时候,partition leader会继续发送message到这个partition follower上。若是这个partition replica以前在ack列表中,此时重启后,须要把这个partition replica再手动加到ack列表中。(ack列表是手动添加的,出现某个部工做的partition replica的时候自动从ack列表中移除的)
- Partition leader与follower:partition也有leader和follower之分。leader是主partition,producer写kafka的时候先写partition leader,再由partition leader push给其余的partition follower。partition leader与follower的信息受Zookeeper控制,一旦partition leader所在的broker节点宕机,zookeeper会冲其余的broker的partition follower上选择follower变为parition leader。
- Topic分配partition和partition replica的算法:(1)将Broker(size=n)和待分配的Partition排序。(2)将第i个Partition分配到第(i%n)个Broker上。(3)将第i个Partition的第j个Replica分配到第((i + j) % n)个Broker上
 
- 消息投递可靠性
一个消息如何算投递成功,Kafka提供了三种模式:
- 第一种是啥都无论,发送出去就看成成功,这种状况固然不能保证消息成功投递到broker;
- 第二种是Master-Slave模型,只有当Master和全部Slave都接收到消息时,才算投递成功,这种模型提供了最高的投递可靠性,可是损伤了性能;
- 第三种模型,即只要Master确认收到消息就算投递成功;实际使用时,根据应用特性选择,绝大多数状况下都会中和可靠性和性能选择第三种模型
  消息在broker上的可靠性,由于消息会持久化到磁盘上,因此若是正常stop一个broker,其上的数据不会丢失;可是若是不正常stop,可能会使存在页面缓存来不及写入磁盘的消息丢失,这能够经过配置flush页面缓存的周期、阈值缓解,可是一样会频繁的写磁盘会影响性能,又是一个选择题,根据实际状况配置。
  消息消费的可靠性,Kafka提供的是“At least once”模型,由于消息的读取进度由offset提供,offset能够由消费者本身维护也能够维护在zookeeper里,可是当消息消费后consumer挂掉,offset没有即时写回,就有可能发生重复读的状况,这种状况一样能够经过调整commit offset周期、阈值缓解,甚至消费者本身把消费和commit offset作成一个事务解决,可是若是你的应用不在意重复消费,那就干脆不要解决,以换取最大的性能。
 
- Partition ack:当ack=1,表示producer写partition leader成功后,broker就返回成功,不管其余的partition follower是否写成功。当ack=2,表示producer写partition leader和其余一个follower成功的时候,broker就返回成功,不管其余的partition follower是否写成功。当ack=-1[parition的数量]的时候,表示只有producer所有写成功的时候,才算成功,kafka broker才返回成功信息。这里须要注意的是,若是ack=1的时候,一旦有个broker宕机致使partition的follower和leader切换,会致使丢数据。
  
- message状态:在Kafka中,消息的状态被保存在consumer中,broker不会关心哪一个消息被消费了被谁消费了,只记录一个offset值(指向partition中下一个要被消费的消息位置),这就意味着若是consumer处理很差的话,broker上的一个消息可能会被消费屡次。
- message持久化:Kafka中会把消息持久化到本地文件系统中,而且保持o(1)极高的效率。咱们众所周知IO读取是很是耗资源的性能也是最慢的,这就是为了数据库的瓶颈常常在IO上,须要换SSD硬盘的缘由。可是Kafka做为吞吐量极高的MQ,却能够很是高效的message持久化到文件。这是由于Kafka是顺序写入o(1)的时间复杂度,速度很是快。也是高吞吐量的缘由。因为message的写入持久化是顺序写入的,所以message在被消费的时候也是按顺序被消费的,保证partition的message是顺序消费的。通常的机器,单机每秒100k条数据。
- message有效期:Kafka会长久保留其中的消息,以便consumer能够屡次消费,固然其中不少细节是可配置的。
- Produer : Producer向Topic发送message,不须要指定partition,直接发送就行了。kafka经过partition ack来控制是否发送成功并把信息返回给producer,producer能够有任意多的thread,这些kafka服务器端是不care的。Producer端的delivery guarantee默认是At least once的。也能够设置Producer异步发送实现At most once。Producer能够用主键幂等性实现Exactly once
- Kafka高吞吐量: Kafka的高吞吐量体如今读写上,分布式并发的读和写都很是快,写的性能体如今以o(1)的时间复杂度进行顺序写入。读的性能体如今以o(1)的时间复杂度进行顺序读取, 对topic进行partition分区,consume group中的consume线程能够以很高能性能进行顺序读。
- Kafka delivery guarantee(message传送保证):(1)At most once消息可能会丢,绝对不会重复传输;(2)At least once 消息绝对不会丢,可是可能会重复传输;(3)Exactly once每条信息确定会被传输一次且仅传输一次,这是用户想要的。
- 批量发送:Kafka支持以消息集合为单位进行批量发送,以提升push效率。
- push-and-pull : Kafka中的Producer和consumer采用的是push-and-pull模式,即Producer只管向broker push消息,consumer只管从broker pull消息,二者对消息的生产和消费是异步的。
- Kafka集群中broker之间的关系:不是主从关系,各个broker在集群中地位同样,咱们能够随意的增长或删除任何一个broker节点。
- 负载均衡方面: Kafka提供了一个 metadata API来管理broker之间的负载(对Kafka0.8.x而言,对于0.7.x主要靠zookeeper来实现负载均衡)。
- 同步异步:Producer采用异步push方式,极大提升Kafka系统的吞吐率(能够经过参数控制是采用同步仍是异步方式)。
- 分区机制partition:Kafka的broker端支持消息分区partition,Producer能够决定把消息发到哪一个partition,在一个partition 中message的顺序就是Producer发送消息的顺序,一个topic中能够有多个partition,具体partition的数量是可配置的。partition的概念使得kafka做为MQ能够横向扩展,吞吐量巨大。partition能够设置replica副本,replica副本存在不一样的kafka broker节点上,第一个partition是leader,其余的是follower,message先写到partition leader上,再由partition leader push到parition follower上。因此说kafka能够水平扩展,也就是扩展partition。
- 离线数据装载:Kafka因为对可拓展的数据持久化的支持,它也很是适合向Hadoop或者数据仓库中进行数据装载。
- 实时数据与离线数据:kafka既支持离线数据也支持实时数据,由于kafka的message持久化到文件,并能够设置有效期,所以能够把kafka做为一个高效的存储来使用,能够做为离线数据供后面的分析。固然做为分布式实时消息系统,大多数状况下仍是用于实时的数据处理的,可是当cosumer消费能力降低的时候能够经过message的持久化在淤积数据在kafka。
- 插件支持:如今很多活跃的社区已经开发出很多插件来拓展Kafka的功能,如用来配合Storm、Hadoop、flume相关的插件。
- 解耦:  至关于一个MQ,使得Producer和Consumer之间异步的操做,系统之间解耦
- 冗余:  replica有多个副本,保证一个broker node宕机后不会影响整个服务
- 扩展性:  broker节点能够水平扩展,partition也能够水平增长,partition replica也能够水平增长
- 峰值:  在访问量剧增的状况下,kafka水平扩展, 应用仍然须要继续发挥做用
- 可恢复性:  系统的一部分组件失效时,因为有partition的replica副本,不会影响到整个系统。
- 顺序保证性:因为kafka的producer的写message与consumer去读message都是顺序的读写,保证了高效的性能。
- 缓冲:因为producer那面可能业务很简单,然后端consumer业务会很复杂并有数据库的操做,所以确定是producer会比consumer处理速度快,若是没有kafka,producer直接调用consumer,那么就会形成整个系统的处理速度慢,加一层kafka做为MQ,能够起到缓冲的做用。
- 异步通讯:做为MQ,Producer与Consumer异步通讯

2.Kafka文件存储机制

2.1 Kafka部分名词解释以下:
 
     Kafka中发布订阅的对象是topic。咱们能够为每类数据建立一个topic,把向topic发布消息的客户端称做producer,从topic订阅消息的客户端称做consumer。Producers和consumers能够同时从多个topic读写数据。一个kafka集群由一个或多个broker服务器组成,它负责持久化和备份具体的kafka消息。
  • Broker:Kafka节点,一个Kafka节点就是一个broker,多个broker能够组成一个Kafka集群。
  • Topic:一类消息,消息存放的目录即主题,例如page view日志、click日志等均可以以topic的形式存在,Kafka集群可以同时负责多个topic的分发。
  • Partition:topic物理上的分组,一个topic能够分为多个partition,每一个partition是一个有序的队列
  • Segment:partition物理上由多个segment组成,每一个Segment存着message信息
  • Producer : 生产message发送到topic
  • Consumer : 订阅topic消费message, consumer做为一个线程来消费
  • Consumer Group:一个Consumer Group包含多个consumer, 这个是预先在配置文件中配置好的。各个consumer(consumer 线程)能够组成一个组(Consumer group ),partition中的每一个message只能被组(Consumer group ) 中的一个consumer(consumer 线程 )消费,若是一个message能够被多个consumer(consumer 线程 ) 消费的话,那么这些consumer必须在不一样的组。Kafka不支持一个partition中的message由两个或两个以上的consumer thread来处理,即使是来自不一样的consumer group的也不行。它不能像AMQ那样能够多个BET做为consumer去处理message,这是由于多个BET去消费一个Queue中的数据的时候,因为要保证不能多个线程拿同一条message,因此就须要行级别悲观所(for update),这就致使了consume的性能降低,吞吐量不够。而kafka为了保证吞吐量,只容许一个consumer线程去访问一个partition。若是以为效率不高的时候,能够加partition的数量来横向扩展,那么再加新的consumer thread去消费。这样没有锁竞争,充分发挥了横向的扩展性,吞吐量极高。这也就造成了分布式消费的概念。

  • 2.2 kafka一些原理概念
1.持久化
kafka使用文件存储消息(append only log),这就直接决定kafka在性能上严重依赖文件系统的自己特性.且不管任何OS下,对文件系统自己的优化是很是艰难的.文件缓存/直接内存映射等是经常使用的手段.由于kafka是对日志文件进行append操做,所以磁盘检索的开支是较小的;同时为了减小磁盘写入的次数,broker会将消息暂时buffer起来,当消息的个数(或尺寸)达到必定阀值时,再flush到磁盘,这样减小了磁盘IO调用的次数.对于kafka而言,较高性能的磁盘,将会带来更加直接的性能提高.
 
2.性能
除磁盘IO以外,咱们还须要考虑网络IO,这直接关系到kafka的吞吐量问题.kafka并无提供太多高超的技巧;对于producer端,能够将消息buffer起来,当消息的条数达到必定阀值时,批量发送给broker;对于consumer端也是同样,批量fetch多条消息.不过消息量的大小能够经过配置文件来指定.对于kafka broker端,彷佛有个sendfile系统调用能够潜在的提高网络IO的性能:将文件的数据映射到系统内存中,socket直接读取相应的内存区域便可,而无需进程再次copy和交换(这里涉及到"磁盘IO数据"/"内核内存"/"进程内存"/"网络缓冲区",多者之间的数据copy).
其实对于producer/consumer/broker三者而言,CPU的开支应该都不大,所以启用消息压缩机制是一个良好的策略;压缩须要消耗少许的CPU资源,不过对于kafka而言,网络IO更应该须要考虑.能够将任何在网络上传输的消息都通过压缩.kafka支持gzip/snappy等多种压缩方式.
 
3.负载均衡
kafka集群中的任何一个broker,均可以向producer提供metadata信息,这些metadata中包含"集群中存活的servers列表"/"partitions leader列表"等信息(请参看zookeeper中的节点信息). 当producer获取到metadata信息以后, producer将会和Topic下全部partition leader保持socket链接;消息由producer直接经过socket发送到broker,中间不会通过任何"路由层".
异步发送,将多条消息暂且在客户端buffer起来,并将他们批量发送到broker;小数据IO太多,会拖慢总体的网络延迟,批量延迟发送事实上提高了网络效率;不过这也有必定的隐患,好比当producer失效时,那些还没有发送的消息将会丢失。
 
4.Topic模型
其余JMS实现,消息消费的位置是有prodiver保留,以便避免重复发送消息或者将没有消费成功的消息重发等,同时还要控制消息的状态.这就要求JMS broker须要太多额外的工做.在kafka中,partition中的消息只有一个consumer在消费,且不存在消息状态的控制,也没有复杂的消息确认机制,可见kafka broker端是至关轻量级的.当消息被consumer接收以后,consumer能够在本地保存最后消息的offset,并间歇性的向zookeeper注册offset.因而可知,consumer客户端也很轻量级。
kafka中consumer负责维护消息的消费记录,而broker则不关心这些,这种设计不只提升了consumer端的灵活性,也适度的减轻了broker端设计的复杂度;这是和众多JMS prodiver的区别.此外,kafka中消息ACK的设计也和JMS有很大不一样,kafka中的消息是批量(一般以消息的条数或者chunk的尺寸为单位)发送给consumer,当消息消费成功后,向zookeeper提交消息的offset,而不会向broker交付ACK.或许你已经意识到,这种"宽松"的设计,将会有"丢失"消息/"消息重发"的危险.
 
5.消息传输一致
Kafka提供3种消息传输一致性语义:最多1次,最少1次,刚好1次。
最少1次:可能会重传数据,有可能出现数据被重复处理的状况;
最多1次:可能会出现数据丢失状况;
刚好1次:并非指真正只传输1次,只不过有一个机制。确保不会出现“数据被重复处理”和“数据丢失”的状况。
 
at most once: 消费者fetch消息,而后保存offset,而后处理消息;当client保存offset以后,可是在消息处理过程当中consumer进程失效(crash),致使部分消息未能继续处理.那么此后可能其余consumer会接管,可是由于offset已经提早保存,那么新的consumer将不能fetch到offset以前的消息(尽管它们尚没有被处理),这就是"at most once".
at least once: 消费者fetch消息,而后处理消息,而后保存offset.若是消息处理成功以后,可是在保存offset阶段zookeeper异常或者consumer失效,致使保存offset操做未能执行成功,这就致使接下来再次fetch时可能得到上次已经处理过的消息,这就是"at least once".
"Kafka Cluster"到消费者的场景中能够采起如下方案来获得“刚好1次”的一致性语义:
最少1次+消费者的输出中额外增长已处理消息最大编号:因为已处理消息最大编号的存在,不会出现重复处理消息的状况。
 
6.副本
kafka中,replication策略是基于partition,而不是topic;kafka将每一个partition数据复制到多个server上,任何一个partition有一个leader和多个follower(能够没有);备份的个数能够经过broker配置文件来设定。leader处理全部的read-write请求,follower须要和leader保持同步.Follower就像一个"consumer",消费消息并保存在本地日志中;leader负责跟踪全部的follower状态,若是follower"落后"太多或者失效,leader将会把它从replicas同步列表中删除.当全部的follower都将一条消息保存成功,此消息才被认为是"committed",那么此时consumer才能消费它,这种同步策略,就要求follower和leader之间必须具备良好的网络环境.即便只有一个replicas实例存活,仍然能够保证消息的正常发送和接收,只要zookeeper集群存活便可.
选择follower时须要兼顾一个问题,就是新leader server上所已经承载的partition leader的个数,若是一个server上有过多的partition leader,意味着此server将承受着更多的IO压力.在选举新leader,须要考虑到"负载均衡",partition leader较少的broker将会更有可能成为新的leader.
 
7.log
每一个log entry格式为"4个字节的数字N表示消息的长度" + "N个字节的消息内容";每一个日志都有一个offset来惟一的标记一条消息,offset的值为8个字节的数字,表示此消息在此partition中所处的起始位置..每一个partition在物理存储层面,有多个log file组成(称为segment).segment file的命名为"最小offset".kafka.例如"00000000000.kafka";其中"最小offset"表示此segment中起始消息的offset.
获取消息时,须要指定offset和最大chunk尺寸,offset用来表示消息的起始位置,chunk size用来表示最大获取消息的总长度(间接的表示消息的条数).根据offset,能够找到此消息所在segment文件,而后根据segment的最小offset取差值,获得它在file中的相对位置,直接读取输出便可.
 
8.分布式
kafka使用zookeeper来存储一些meta信息,并使用了zookeeper watch机制来发现meta信息的变动并做出相应的动做(好比consumer失效,触发负载均衡等)
Broker node registry: 当一个kafka broker启动后,首先会向zookeeper注册本身的节点信息(临时znode),同时当broker和zookeeper断开链接时,此znode也会被删除.
Broker Topic Registry: 当一个broker启动时,会向zookeeper注册本身持有的topic和partitions信息,仍然是一个临时znode.
Consumer and Consumer group: 每一个consumer客户端被建立时,会向zookeeper注册本身的信息;此做用主要是为了"负载均衡".一个group中的多个consumer能够交错的消费一个topic的全部partitions;简而言之,保证此topic的全部partitions都能被此group所消费,且消费时为了性能考虑,让partition相对均衡的分散到每一个consumer上.
Consumer id Registry: 每一个consumer都有一个惟一的ID(host:uuid,能够经过配置文件指定,也能够由系统生成),此id用来标记消费者信息.
Consumer offset Tracking: 用来跟踪每一个consumer目前所消费的partition中最大的offset.此znode为持久节点,能够看出offset跟group_id有关,以代表当group中一个消费者失效,其余consumer能够继续消费.
Partition Owner registry: 用来标记partition正在被哪一个consumer消费.临时znode。此节点表达了"一个partition"只能被group下一个consumer消费,同时当group下某个consumer失效,那么将会触发负载均衡(即:让partitions在多个consumer间均衡消费,接管那些"游离"的partitions)
当consumer启动时,所触发的操做:
A) 首先进行"Consumer id Registry";
B) 而后在"Consumer id Registry"节点下注册一个watch用来监听当前group中其余consumer的"leave"和"join";只要此znode path下节点列表变动,都会触发此group下consumer的负载均衡.(好比一个consumer失效,那么其余consumer接管partitions).
C) 在"Broker id registry"节点下,注册一个watch用来监听broker的存活状况;若是broker列表变动,将会触发全部的groups下的consumer从新balance.
 
总结:
1) Producer端使用zookeeper用来"发现"broker列表,以及和Topic下每一个partition leader创建socket链接并发送消息.
2) Broker端使用zookeeper用来注册broker信息,已经监测partition leader存活性.
3) Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader创建socket链接,并获取消息。
 
9.Leader的选择
Kafka的核心是日志文件,日志文件在集群中的同步是分布式数据系统最基础的要素。
若是leaders永远不会down的话咱们就不须要followers了!一旦leader down掉了,须要在followers中选择一个新的leader.可是followers自己有可能延时过久或者crash,因此必须选择高质量的follower做为leader.必须保证,一旦一个消息被提交了,可是leader down掉了,新选出的leader必须能够提供这条消息。大部分的分布式系统采用了多数投票法则选择新的leader,对于多数投票法则,就是根据全部副本节点的情况动态的选择最适合的做为leader.Kafka并非使用这种方法。
Kafka动态维护了一个同步状态的副本的集合(a set of in-sync replicas),简称ISR,在这个集合中的节点都是和leader保持高度一致的,任何一条消息必须被这个集合中的每一个节点读取并追加到日志中了,才回通知外部这个消息已经被提交了。所以这个集合中的任何一个节点随时均可以被选为leader.ISR在ZooKeeper中维护。ISR中有f+1个节点,就能够容许在f个节点down掉的状况下不会丢失消息并正常提供服。ISR的成员是动态的,若是一个节点被淘汰了,当它从新达到“同步中”的状态时,他能够从新加入ISR.这种leader的选择方式是很是快速的,适合kafka的应用场景。
一个邪恶的想法:若是全部节点都down掉了怎么办?Kafka对于数据不会丢失的保证,是基于至少一个节点是存活的,一旦全部节点都down了,这个就不能保证了。
实际应用中,当全部的副本都down掉时,必须及时做出反应。能够有如下两种选择:
1. 等待ISR中的任何一个节点恢复并担任leader。
2. 选择全部节点中(不仅是ISR)第一个恢复的节点做为leader.
这是一个在可用性和连续性之间的权衡。若是等待ISR中的节点恢复,一旦ISR中的节点起不起来或者数据都是了,那集群就永远恢复不了了。若是等待ISR意外的节点恢复,这个节点的数据就会被做为线上数据,有可能和真实的数据有所出入,由于有些数据它可能还没同步到。Kafka目前选择了第二种策略,在将来的版本中将使这个策略的选择可配置,能够根据场景灵活的选择。
这种窘境不仅Kafka会遇到,几乎全部的分布式数据系统都会遇到。
 
10.副本管理
以上仅仅以一个topic一个分区为例子进行了讨论,但实际上一个Kafka将会管理成千上万的topic分区.Kafka尽可能的使全部分区均匀的分布到集群全部的节点上而不是集中在某些节点上,另外主从关系也尽可能均衡这样每一个几点都会担任必定比例的分区的leader.
优化leader的选择过程也是很重要的,它决定了系统发生故障时的空窗期有多久。Kafka选择一个节点做为“controller”,当发现有节点down掉的时候它负责在游泳分区的全部节点中选择新的leader,这使得Kafka能够批量的高效的管理全部分区节点的主从关系。若是controller down掉了,活着的节点中的一个会备切换为新的controller.
 
11.Leader与副本同步
对于某个分区来讲,保存正分区的"broker"为该分区的"leader",保存备份分区的"broker"为该分区的"follower"。备份分区会彻底复制正分区的消息,包括消息的编号等附加属性值。为了保持正分区和备份分区的内容一致,Kafka采起的方案是在保存备份分区的"broker"上开启一个消费者进程进行消费,从而使得正分区的内容与备份分区的内容保持一致。通常状况下,一个分区有一个“正分区”和零到多个“备份分区”。能够配置“正分区+备份分区”的总数量,关于这个配置,不一样主题能够有不一样的配置值。注意,生产者,消费者只与保存正分区的"leader"进行通讯。
 
Kafka容许topic的分区拥有若干副本,这个数量是能够配置的,你能够为每一个topic配置副本的数量。Kafka会自动在每一个副本上备份数据,因此当一个节点down掉时数据依然是可用的。
Kafka的副本功能不是必须的,你能够配置只有一个副本,这样其实就至关于只有一份数据。
建立副本的单位是topic的分区,每一个分区都有一个leader和零或多个followers.全部的读写操做都由leader处理,通常分区的数量都比broker的数量多的多,各分区的leader均匀的分布在brokers中。全部的followers都复制leader的日志,日志中的消息和顺序都和leader中的一致。followers向普通的consumer那样从leader那里拉取消息并保存在本身的日志文件中。
许多分布式的消息系统自动的处理失败的请求,它们对一个节点是否着(alive)”有着清晰的定义。Kafka判断一个节点是否活着有两个条件:
1. 节点必须能够维护和ZooKeeper的链接,Zookeeper经过心跳机制检查每一个节点的链接。
2. 若是节点是个follower,他必须能及时的同步leader的写操做,延时不能过久。
符合以上条件的节点准确的说应该是“同步中的(in sync)”,而不是模糊的说是“活着的”或是“失败的”。Leader会追踪全部“同步中”的节点,一旦一个down掉了,或是卡住了,或是延时过久,leader就会把它移除。至于延时多久算是“过久”,是由参数replica.lag.max.messages决定的,怎样算是卡住了,怎是由参数replica.lag.time.max.ms决定的。
只有当消息被全部的副本加入到日志中时,才算是“committed”,只有committed的消息才会发送给consumer,这样就不用担忧一旦leader down掉了消息会丢失。Producer也能够选择是否等待消息被提交的通知,这个是由参数acks决定的。
Kafka保证只要有一个“同步中”的节点,“committed”的消息就不会丢失。
 
 
  • 2.3  kafka拓扑结构

       一个典型的Kafka集群中包含若干Producer(能够是web前端FET,或者是服务器日志等),若干broker(Kafka支持水平扩展,通常broker数量越多,集群吞吐率越高),若干ConsumerGroup,以及一个Zookeeper集群。Kafka经过Zookeeper管理Kafka集群配置:选举Kafka broker的leader,以及在Consumer Group发生变化时进行rebalance,由于consumer消费kafka topic的partition的offsite信息是存在Zookeeper的。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。
 

分析过程分为如下4个步骤:前端

  • topic中partition存储分布
  • partiton中文件存储方式 (partition在linux服务器上就是一个目录(文件夹))
  • partiton中segment文件存储结构
  • 在partition中如何经过offset查找message

经过上述4过程详细分析,咱们就能够清楚认识到kafka文件存储机制的奥秘。node

 
2.3 topic中partition存储分布

假设实验环境中Kafka集群只有一个broker,xxx/message-folder为数据文件存储根目录,在Kafka broker中server.properties文件配置(参数log.dirs=xxx/message-folder),例如建立2个topic名 称分别为report_push、launch_info, partitions数量都为partitions=4mysql

存储路径和目录规则为:linux

xxx/message-foldernginx

  |--report_push-0
  |--report_push-1
  |--report_push-2
  |--report_push-3
  |--launch_info-0
  |--launch_info-1
  |--launch_info-2
  |--launch_info-3
 
在Kafka文件存储中,同一个topic下有多个不一样partition,每一个partition为一个目录,partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始,序号最大值为partitions数量减1。
消息发送时都被发送到一个topic,其本质就是一个目录,而topic由是由一些Partition组成,其组织结构以下图所示:
 
咱们能够看到,Partition是一个Queue的结构,每一个Partition中的消息都是有序的,生产的消息被不断追加到Partition上,其中的每个消息都被赋予了一个惟一的offset值。
 
Kafka集群会保存全部的消息,无论消息有没有被消费;咱们能够设定消息的过时时间,只有过时的数据才会被自动清除以释放磁盘空间。好比咱们设置消息过时时间为2天,那么这2天内的全部消息都会被保存到集群中,数据只有超过了两天才会被清除。
 
Kafka只维护在Partition中的offset值,由于这个offsite标识着这个partition的message消费到哪条了。Consumer每消费一个消息,offset就会加1。其实消息的状态彻底是由Consumer控制的,Consumer能够跟踪和重设这个offset值,这样的话Consumer就能够读取任意位置的消息。
 
把消息日志以Partition的形式存放有多重考虑,第一,方便在集群中扩展,每一个Partition能够经过调整以适应它所在的机器,而一个topic又能够有多个Partition组成,所以整个集群就能够适应任意大小的数据了;第二就是能够提升并发,由于能够以Partition为单位读写了。
 
经过上面介绍的咱们能够知道,kafka中的数据是持久化的而且可以容错的。Kafka容许用户为每一个topic设置副本数量,副本数量决定了有几个broker来存放写入的数据。若是你的副本数量设置为3,那么一份数据就会被存放在3台不一样的机器上,那么就容许有2个机器失败。通常推荐副本数量至少为2,这样就能够保证增减、重启机器时不会影响到数据消费。若是对数据持久化有更高的要求,能够把副本数量设置为3或者更多。
 
Kafka中的topic是以partition的形式存放的,每个topic均可以设置它的partition数量,Partition的数量决定了组成topic的message的数量。Producer在生产数据时,会按照必定规则(这个规则是能够自定义的)把消息发布到topic的各个partition中。上面将的副本都是以partition为单位的,不过只有一个partition的副本会被选举成leader做为读写用。
 
关于如何设置partition值须要考虑的因素。一个partition只能被一个消费者消费(一个消费者能够同时消费多个partition),所以,若是设置的partition的数量小于consumer的数量,就会有消费者消费不到数据。因此,推荐partition的数量必定要大于同时运行的consumer的数量。另一方面,建议partition的数量大于集群broker的数量,这样leader partition就能够均匀的分布在各个broker中,最终使得集群负载均衡。在Cloudera,每一个topic都有上百个partition。须要注意的是,kafka须要为每一个partition分配一些内存来缓存消息数据,若是partition数量越大,就要为kafka分配更大的heap space。
2.4 partiton中文件存储方式
 
  • 每一个partion(目录)至关于一个巨型文件被平均分配到多个大小相等segment(段)数据文件中。但每一个段segment file消息数量不必定相等,这种特性方便old segment file快速被删除。
  • 每一个partiton只须要支持顺序读写就好了,segment文件生命周期由服务端配置参数决定。

这样作的好处就是能快速删除无用文件,有效提升磁盘利用率。web

2.5 partiton中segment文件存储结构
producer发message到某个topic,message会被均匀的分布到多个partition上(随机或根据用户指定的回调函数进行分布),kafka broker收到message往对应partition的最后一个segment上添加该消息,当某个segment上的消息条数达到配置值或消息发布时间超过阈值时,segment上的消息会被flush到磁盘,只有flush到磁盘上的消息consumer才能消费,segment达到必定的大小后将不会再往该segment写数据,broker会建立新的segment。
 
每一个part在内存中对应一个index,记录每一个segment中的第一条消息偏移。
  • segment file组成:由2大部分组成,分别为index file和data file,此2个文件一一对应,成对出现,后缀".index"和“.log”分别表示为segment索引文件、数据文件.
  • segment文件命名规则:partion全局的第一个segment从0开始,后续每一个segment文件名为上一个全局partion的最大offset(偏移message数)。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。
 
每一个segment中存储不少条消息,消息id由其逻辑位置决定,即从消息id可直接定位到消息的存储位置,避免id到位置的额外映射。

下面文件列表是笔者在Kafka broker上作的一个实验,建立一个topicXXX包含1 partition,设置每一个segment大小为500MB,并启动producer向Kafka broker写入大量数据,以下图2所示segment文件列表形象说明了上述2个规则:算法

以上述图2中一对segment file文件为例,说明segment中index<—->data file对应关系物理结构以下:sql

上述图3中索引文件存储大量元数据,数据文件存储大量消息,索引文件中元数据指向对应数据文件中message的物理偏移地址。其中以索引文件中 元数据3,497为例,依次在数据文件中表示第3个message(在全局partiton表示第368772个message)、以及该消息的物理偏移 地址为497。数据库

从上述图3了解到segment data file由许多message组成,下面详细说明message物理结构以下:后端

参数说明:

关键字 解释说明
8 byte offset 在parition(分区)内的每条消息都有一个有序的id号,这个id号被称为偏移(offset),它能够惟一肯定每条消息在parition(分区)内的位置。即offset表示partiion的第多少message
4 byte message size message大小
4 byte CRC32 用crc32校验message
1 byte “magic" 表示本次发布Kafka服务程序协议版本号
1 byte “attributes" 表示为独立版本、或标识压缩类型、或编码类型。
4 byte key length 表示key的长度,当key为-1时,K byte key字段不填
K byte key 可选
value bytes payload 表示实际消息数据。
 
2.6 在partition中如何经过offset查找message

例如读取offset=368776的message,须要经过下面2个步骤查找。

  • 第一步查找segment file

    上述图2为例,其中00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0.第二个文件 00000000000000368769.index的消息量起始偏移量为368770 = 368769 + 1.一样,第三个文件00000000000000737337.index的起始偏移量为737338=737337 + 1,其余后续文件依次类推,以起始偏移量命名并排序这些文件,只要根据offset **二分查找**文件列表,就能够快速定位到具体文件。

    当offset=368776时定位到00000000000000368769.index|log

  • 第二步经过segment file查找message经过第一步定位到segment file,当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置和 00000000000000368769.log的物理偏移地址,而后再经过00000000000000368769.log顺序查找直到 offset=368776为止。

segment index file采起稀疏索引存储方式,它减小索引文件大小,经过mmap能够直接内存操做,稀疏索引为数据文件的每一个对应message设置一个元数据指针,它 比稠密索引节省了更多的存储空间,但查找起来须要消耗更多的时间。
 
kafka会记录offset到zk中。可是,zk client api对zk的频繁写入是一个低效的操做。0.8.2 kafka引入了native offset storage,将offset管理从zk移出,而且能够作到水平扩展。其原理就是利用了kafka的compacted topic,offset以consumer group,topic与partion的组合做为key直接提交到compacted topic中。同时Kafka又在内存中维护了的三元组来维护最新的offset信息,consumer来取最新offset信息的时候直接内存里拿便可。固然,kafka容许你快速的checkpoint最新的offset信息到磁盘上。
 
3.Partition Replication原则

Kafka高效文件存储设计特色

  • Kafka把topic中一个parition大文件分红多个小文件段,经过多个小文件段,就容易按期清除或删除已经消费完文件,减小磁盘占用。
  • 经过索引信息能够快速定位message和肯定response的最大大小。
  • 经过index元数据所有映射到memory,能够避免segment file的IO磁盘操做。
  • 经过索引文件稀疏存储,能够大幅下降index文件元数据占用空间大小。
 
 

1. Kafka集群partition replication默认自动分配分析

下面以一个Kafka集群中4个Broker举例,建立1个topic包含4个Partition,2 Replication;数据Producer流动如图所示:

(1)

 

 

(2)当集群中新增2节点,Partition增长到6个时分布状况以下:

 

副本分配逻辑规则以下:

  • 在Kafka集群中,每一个Broker都有均等分配Partition的Leader机会。
  • 上述图Broker Partition中,箭头指向为副本,以Partition-0为例:broker1中parition-0为Leader,Broker2中Partition-0为副本。
  • 上述图种每一个Broker(按照BrokerId有序)依次分配主Partition,下一个Broker为副本,如此循环迭代分配,多副本都遵循此规则。
 
副本分配算法以下:
  • 将全部N Broker和待分配的i个Partition排序.
  • 将第i个Partition分配到第(i mod n)个Broker上.
  • 将第i个Partition的第j个副本分配到第((i + j) mod n)个Broker上.
 
4.Kafka Broker一些特性
4.1 无状态的Kafka Broker :
1. Broker没有副本机制,一旦broker宕机,该broker的消息将都不可用。
2. Broker不保存订阅者的状态,由订阅者本身保存。
3. 无状态致使消息的删除成为难题(可能删除的消息正在被订阅),kafka采用基于时间的SLA(服务水平保证),消息保存必定时间(一般为7天)后会被删除。
4. 消息订阅者能够rewind back到任意位置从新进行消费,当订阅者故障时,能够选择最小的offset进行从新读取消费消息。
 
4.2 message的交付与生命周期 :
1. 不是严格的JMS, 所以kafka对消息的重复、丢失、错误以及顺序型没有严格的要求。(这是与AMQ最大的区别)
2. kafka提供at-least-once delivery,即当consumer宕机后,有些消息可能会被重复delivery。
3. 因每一个partition只会被consumer group内的一个consumer消费,故kafka保证每一个partition内的消息会被顺序的订阅。
4. Kafka为每条消息为每条消息计算CRC校验,用于错误检测,crc校验不经过的消息会直接被丢弃掉。
 
4.3 压缩
 
Kafka支持以集合(batch)为单位发送消息,在此基础上,Kafka还支持对消息集合进行压缩,Producer端能够经过GZIP或Snappy格式对消息集合进行压缩。Producer端进行压缩以后,在Consumer端需进行解压。压缩的好处就是减小传输的数据量,减轻对网络传输的压力,在对大数据处理上,瓶颈每每体如今网络上而不是CPU。
 
那么如何区分消息是压缩的仍是未压缩的呢,Kafka在消息头部添加了一个描述压缩属性字节,这个字节的后两位表示消息的压缩采用的编码,若是后两位为0,则表示消息未被压缩。
 
4.4 消息可靠性
 
在消息系统中,保证消息在生产和消费过程当中的可靠性是十分重要的,在实际消息传递过程当中,可能会出现以下三中状况:
 
- 一个消息发送失败
 
- 一个消息被发送屡次
 
- 最理想的状况:exactly-once ,一个消息发送成功且仅发送了一次
 
有许多系统声称它们实现了exactly-once,可是它们其实忽略了生产者或消费者在生产和消费过程当中有可能失败的状况。好比虽然一个Producer成功发送一个消息,可是消息在发送途中丢失,或者成功发送到broker,也被consumer成功取走,可是这个consumer在处理取过来的消息时失败了。
 
从Producer端看:Kafka是这么处理的,当一个消息被发送后,Producer会等待broker成功接收到消息的反馈(可经过参数控制等待时间),若是消息在途中丢失或是其中一个broker挂掉,Producer会从新发送(咱们知道Kafka有备份机制,能够经过参数控制是否等待全部备份节点都收到消息)。
 
从Consumer端看:前面讲到过partition,broker端记录了partition中的一个offset值,这个值指向Consumer下一个即将消费message。当Consumer收到了消息,但却在处理过程当中挂掉,此时Consumer能够经过这个offset值从新找到上一个消息再进行处理。Consumer还有权限控制这个offset值,对持久化到broker端的消息作任意处理。
 
4.5 备份机制
 
备份机制是Kafka0.8版本的新特性,备份机制的出现大大提升了Kafka集群的可靠性、稳定性。有了备份机制后,Kafka容许集群中的节点挂掉后而不影响整个集群工做。一个备份数量为n的集群容许n-1个节点失败。在全部备份节点中,有一个节点做为lead节点,这个节点保存了其它备份节点列表,并维持各个备份间的状体同步。下面这幅图解释了Kafka的备份机制:
 
 
4.6 Kafka高效性相关设计
 
4.6.1 消息的持久化
Kafka高度依赖文件系统来存储和缓存消息(AMQ的nessage是持久化到mysql数据库中的),由于通常的人认为磁盘是缓慢的,这致使人们对持久化结构具备竞争性持怀疑态度。其实,磁盘的快或者慢,这决定于咱们如何使用磁盘。由于磁盘线性写的速度远远大于随机写。线性读写在大多数应用场景下是能够预测的。
4.6.2 常数时间性能保证
每一个Topic的Partition的是一个大文件夹,里面有无数个小文件夹segment,但partition是一个队列,队列中的元素是segment,消费的时候先从第0个segment开始消费,新来message存在最后一个消息队列中。对于segment也是对队列,队列元素是message,有对应的offsite标识是哪一个message。消费的时候先从这个segment的第一个message开始消费,新来的message存在segment的最后。
 
消息系统的持久化队列能够构建在对一个文件的读和追加上,就像通常状况下的日志解决方案。它有一个优势,全部的操做都是常数时间,而且读写之间不会相互阻塞。这种设计具备极大的性能优点:最终系统性能和数据大小彻底无关,服务器能够充分利用廉价的硬盘来提供高效的消息服务。
 
事实上还有一点,磁盘空间的无限增大而不影响性能这点,意味着咱们能够提供通常消息系统没法提供的特性。好比说,消息被消费后不是立马被删除,咱们能够将这些消息保留一段相对比较长的时间(好比一个星期)。
 
5.Kafka 生产者-消费者
     消息系统一般都会由生产者,消费者,Broker三大部分组成,生产者会将消息写入到Broker,消费者会从Broker中读取出消息,不一样的MQ实现的Broker实现会有所不一样,不过Broker的本质都是要负责将消息落地到服务端的存储系统中。具体步骤以下:
  1. 生产者客户端应用程序产生消息:

    1. 客户端链接对象将消息包装到请求中发送到服务端
    2. 服务端的入口也有一个链接对象负责接收请求,并将消息以文件的形式存储起来
    3. 服务端返回响应结果给生产者客户端
  2. 消费者客户端应用程序消费消息:

    1. 客户端链接对象将消费信息也包装到请求中发送给服务端
    2. 服务端从文件存储系统中取出消息
    3. 服务端返回响应结果给消费者客户端
    4. 客户端将响应结果还原成消息并开始处理消息
 
                                                                              图4-1 客户端和服务端交互
 
5.1  Producers
 
Producers直接发送消息到broker上的leader partition,不须要通过任何中介或其余路由转发。为了实现这个特性,kafka集群中的每一个broker均可以响应producer的请求,并返回topic的一些元信息,这些元信息包括哪些机器是存活的,topic的leader partition都在哪,现阶段哪些leader partition是能够直接被访问的。
 
Producer客户端本身控制着消息被推送到哪些partition。实现的方式能够是随机分配、实现一类随机负载均衡算法,或者指定一些分区算法。Kafka提供了接口供用户实现自定义的partition,用户能够为每一个消息指定一个partitionKey,经过这个key来实现一些hash分区算法。好比,把userid做为partitionkey的话,相同userid的消息将会被推送到同一个partition。
 
以Batch的方式推送数据能够极大的提升处理效率,kafka Producer 能够将消息在内存中累计到必定数量后做为一个batch发送请求。Batch的数量大小能够经过Producer的参数控制,参数值能够设置为累计的消息的数量(如500条)、累计的时间间隔(如100ms)或者累计的数据大小(64KB)。经过增长batch的大小,能够减小网络请求和磁盘IO的次数,固然具体参数设置须要在效率和时效性方面作一个权衡。
 
Producers能够异步的并行的向kafka发送消息,可是一般producer在发送完消息以后会获得一个future响应,返回的是offset值或者发送过程当中遇到的错误。这其中有个很是重要的参数“acks”,这个参数决定了producer要求leader partition 收到确认的副本个数,若是acks设置数量为0,表示producer不会等待broker的响应,因此,producer没法知道消息是否发送成功,这样有可能会致使数据丢失,但同时,acks值为0会获得最大的系统吞吐量。
 
若acks设置为1,表示producer会在leader partition收到消息时获得broker的一个确认,这样会有更好的可靠性,由于客户端会等待直到broker确认收到消息。若设置为-1,producer会在全部备份的partition收到消息时获得broker的确认,这个设置能够获得最高的可靠性保证。
 
Kafka 消息有一个定长的header和变长的字节数组组成。由于kafka消息支持字节数组,也就使得kafka能够支持任何用户自定义的序列号格式或者其它已有的格式如Apache Avro、protobuf等。Kafka没有限定单个消息的大小,但咱们推荐消息大小不要超过1MB,一般通常消息大小都在1~10kB以前。
 
发布消息时,kafka client先构造一条消息,将消息加入到消息集set中(kafka支持批量发布,能够往消息集合中添加多条消息,一次行发布),send消息时,producer client需指定消息所属的topic。
 
5.2  Consumers
Kafka提供了两套consumer api,分为high-level api和sample-api。Sample-api 是一个底层的API,它维持了一个和单一broker的链接,而且这个API是彻底无状态的,每次请求都须要指定offset值,所以,这套API也是最灵活的。
 
在kafka中,当前读到哪条消息的offset值是由consumer来维护的,所以,consumer能够本身决定如何读取kafka中的数据。好比,consumer能够经过重设offset值来从新消费已消费过的数据。无论有没有被消费,kafka会保存数据一段时间,这个时间周期是可配置的,只有到了过时时间,kafka才会删除这些数据。(这一点与AMQ不同,AMQ的message通常来讲都是持久化到mysql中的,消费完的message会被delete掉)
 
High-level API封装了对集群中一系列broker的访问,能够透明的消费一个topic。它本身维持了已消费消息的状态,即每次消费的都是下一个消息。
 
High-level API还支持以组的形式消费topic,若是consumers有同一个组名,那么kafka就至关于一个队列消息服务,而各个consumer均衡的消费相应partition中的数据。若consumers有不一样的组名,那么此时kafka就至关与一个广播服务,会把topic中的全部消息广播到每一个consumer。
 
High level api和Low level api是针对consumer而言的,和producer无关。
 
High level api是consumer读的partition的offsite是存在zookeeper上。High level api 会启动另一个线程去每隔一段时间,offsite自动同步到zookeeper上。换句话说,若是使用了High level api, 每一个message只能被读一次,一旦读了这条message以后,不管我consumer的处理是否ok。High level api的另一个线程会自动的把offiste+1同步到zookeeper上。若是consumer读取数据出了问题,offsite也会在zookeeper上同步。所以,若是consumer处理失败了,会继续执行下一条。这每每是不对的行为。所以,Best Practice是一旦consumer处理失败,直接让整个conusmer group抛Exception终止,可是最后读的这一条数据是丢失了,由于在zookeeper里面的offsite已经+1了。等再次启动conusmer group的时候,已经从下一条开始读取处理了。
 
Low level api是consumer读的partition的offsite在consumer本身的程序中维护。不会同步到zookeeper上。可是为了kafka manager可以方便的监控,通常也会手动的同步到zookeeper上。这样的好处是一旦读取某个message的consumer失败了,这条message的offsite咱们本身维护,咱们不会+1。下次再启动的时候,还会从这个offsite开始读。这样能够作到exactly once对于数据的准确性有保证。
 
 
对于Consumer group:
1. 容许consumer group(包含多个consumer,如一个集群同时消费)对一个topic进行消费,不一样的consumer group之间独立消费。
2. 为了对减少一个consumer group中不一样consumer之间的分布式协调开销,指定partition为最小的并行消费单位,即一个group内的consumer只能消费不一样的partition。
 
 
Consumer与Partition的关系:
- 若是consumer比partition多,是浪费,由于kafka的设计是在一个partition上是不容许并发的,因此consumer数不要大于partition数
- 若是consumer比partition少,一个consumer会对应于多个partitions,这里主要合理分配consumer数和partition数,不然会致使partition里面的数据被取的不均匀
- 若是consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不一样
- 增减consumer,broker,partition会致使rebalance,因此rebalance后consumer对应的partition会发生变化
- High-level接口中获取不到数据的时候是会block的
 
负载低的状况下能够每一个线程消费多个partition。但负载高的状况下,Consumer 线程数最好和Partition数量保持一致。若是仍是消费不过来,应该再开 Consumer 进程,进程内线程数一样和分区数一致。
 
消费消息时,kafka client需指定topic以及partition number(每一个partition对应一个逻辑日志流,如topic表明某个产品线,partition表明产品线的日志按天切分的结果),consumer client订阅后,就可迭代读取消息,若是没有消息,consumer client会阻塞直到有新的消息发布。consumer能够累积确认接收到的消息,当其确认了某个offset的消息,意味着以前的消息也都已成功接收到,此时broker会更新zookeeper上地offset registry。
 
5.3  高效的数据传输
1.  发布者每次可发布多条消息(将消息加到一个消息集合中发布), consumer每次迭代消费一条消息。

2.  不建立单独的cache,使用系统的page cache。发布者顺序发布,订阅者一般比发布者滞后一点点,直接使用Linuxpage cache效果也比较后,同时减小了cache管理及垃圾收集的开销。

3.  使用sendfile优化网络传输,减小一次内存拷贝。
 
6.Kafka 与 Zookeeper
 
6.1 Zookeeper 协调控制
1. 管理broker与consumer的动态加入与离开。(Producer不须要管理,随便一台计算机均可以做为Producer向Kakfa Broker发消息)
2. 触发负载均衡,当broker或consumer加入或离开时会触发负载均衡算法,使得一
   个 consumer group内的多个consumer的消费负载平衡。(由于一个comsumer消费一个或多个partition,一个partition只能被一个consumer消费)

3.  维护消费关系及每一个partition的消费信息。

 

6.2 Zookeeper上的细节:

1. 每一个broker启动后会在zookeeper上注册一个临时的broker registry,包含broker的ip地址和端口号,所存储的topics和partitions信息。

2. 每一个consumer启动后会在zookeeper上注册一个临时的consumer registry:包含consumer所属的consumer group以及订阅的topics。

3. 每一个consumer group关联一个临时的owner registry和一个持久的offset registry。对于被订阅的每一个partition包含一个owner registry,内容为订阅这个partition的consumer id;同时包含一个offset registry,内容为上一次订阅的offset。

相关文章
相关标签/搜索