漫谈使用Kafka做为MQ中间件

哪些场景适合使用Kafka
线上系统会实时产生数以万计的日志信息,服务器运行状态,用户行为记录,业务消息 等信息,这些信息须要用于多个不一样的目的,好比审计、安全、数据挖掘等,所以须要以分类的方式将这些信息发送到某个地方,以方便后台处理service实时的去获取数据。MQ用于解决数据生成速率与数据消费速率不一致的场景,业务接口解耦,数据缓存冗余,海量数据处理弹性,异步通讯。node

Kafka是LinkedIn开源出来的分布式消息发布-订阅系统,主要特色是基于Pull模式来处理消息,O(1)常数时间级别的消息持久化和读取时间复杂度,基于at least once的处理原则,追求高吞吐量(单台机器吞吐量可达10w/s),主要用于非敏感信息如日志的收集和传输,不支持事务,对消息的重复、丢失和错误没有严格要求 。算法

 

简单的Kafka消息生产/消费模型
最简单的Kafka拓扑网络里只有一个broker,producer建立不一样的topics,并将msg经过push的方式发送到broker,broker使用append log的方式顺序性持久化msg,而后不一样的consumers根据本身的消费速率按需从broker处pull本身须要的topics对应的msg;对于一个broker上的一个topic的msg而言,kafka总会保证其被consumer消费的前后顺序;采用pull的消息处理模式可让consumer按需处理。api

Broker从本地文件系统里添加消息和获取消息都是按照队列的入列和出列模式操做,所以时间复杂度都是O(1);Kafka不会主动删除被消费过的消息,而是经过server.properties中的配置按照过时时间或者文件总大小来进行文件删除。缓存

构建Kafka Cluster,并使用zookeeper cluster做为协调服务
为了提供更高的msg吞吐量以及HA,Kafka支持以cluster的方式建立多个broker对外提供服务,因为集群环境引入的不肯定性,kafka使用Zookeeper做为协调性服务(0.7.+引入),功能以下:
#1监听broker的活跃状态及其存储的topic和partition状态,协调broker leader和partition leader的选举;
#2 为producer提供broker的访问地址,并记录每一个topic下对应的partition leader分布地址,以帮助实现负载均衡;
#3 为consumer提供broker的访问地址,并记录每一个partition正在被哪些CG内的哪个consumer消费,CG中成员的变化,以帮助实现负载均衡,同时记录每一个CG的offset。安全

Zookeeper上典型的存储kafka信息的格式以下,Kafka Cluster中每一个broker均可以获取关于cluster的metadata,包含active broker list,topic’s partition leaders等,所以对于producer而言每一个broker都是对等的;服务器

 1 leo-chen-zookeeper
 2 -> broker
 3    -> ids
 4       -> [broker_id] ## temp znode, value is host:port
 5    -> topics
 6       -> [topic_id] 
 7           -> [partition_id] ## temp znode, value is partition refer
 8 -> consumers
 9    -> [group_id]
10        -> ids
11           -> [consumer_id] ## temp znode, value is partition_id list
12        -> offsets
13           -> [topic_id]
14               -> [broker_id-partition_id] ## persist znode, value is offset
15        -> owners
16           -> [topic_id]
17               -> [broker_id-partition_id] ## temp znode, value is consumer_id

Zookeeper实现的功能以下:
#1 broker node注册:新上线的broker会在zookeeper上建立一个temp znode以维护本身的活跃状态,znode value为broker的访问地址;broker下线或者session失效都会致使temp znode被删除。网络

#2 broker topic注册:新上线的broker还会根据自身存储的topic-partition建立对应的temp znode,znode value为partition的索引,用于失效转移的时候进行状态对比。session

#3 consumer node注册:新上线的consumer会在zookeeper上建立一个temp znode以维护本身的活跃状态,znode value为该consumer正在访问的topic-partition列表;consumer的上线下线都会触发kafka的rebalance动做。并发

#4 consumer-partition offset注册:一个CG中新上线的consumer会根据本身正在访问的partition对应的offset在zookeeper上建立一个persist znode,znode value为offset的值;当consumer下线以后,同一个CG内的其余consumer会继续消费这个offset对应的partition msg;app

#5 partition owner注册:新上线的consumer会根据本身正在消费的topic-partition在zookeeper上建立一个temp znode,表示当前CG内全部正在被消费的partition都有哪些consumer在消费。

一个新的consumer上线以后会触发以下操做:
#1 进行consumer node注册;
#2 在consumers/[group_id]/ids路径下注册一个watcher用于监听当前group中其余consumer临时节点的变更,若是有变更则触发负载均衡,通知当前consumer node从新计算可消费的topic-partition;
#3 在broker/ids路径下注册一个watcher用于监听全部broker临时节点的变更,若是有变更则触发负载均衡,通知当前consumer node从新计算可消费的topic-partition;

维护topic下partition的数量,同步和过时策略
kafka将每个topics拆分红多个partition(0.8.+引入)以便于负载均衡到多个broker上,因为一个topics的msg被分拆到了多个partition,则 kafka只能保证按一个partition中的msg按顺序让consumer进行消费(除partition所在的broker下线的状况),并不保证一个topic内多个partition间的msg的消费顺序。一个topics的msg划分到哪一个partition的策略有两种,一是采用Key Hash算法,一是采用Round Robin算法。

Kafka经过partition log文件在文件系统上存储msg,msg的写入和读取均可以是批量线性的,同时基于read-ahead,write-behind,线性读写,系统页缓存的操做方式使得kafka对partition log文件的操做很是快,而且优于JVM的内存操做效率;传统的RPC文件读取流程会经历四个步骤:磁盘到内核页缓存,内核页缓存到用户空间缓存,用户空间缓存到内核socket缓存,内核socket缓存到网卡缓存,最终发送给用户;而利用sendfile和zero-copy技术能够将内核页缓存的数据直接复制到网卡缓存,从而可让kafka实现近似缓存级别的数据操做速度。

Broker上典型的存储msg的文件格式以下,~/leo-chen/kafka-msg表示log.dirs指向的根目录,而后是按照topic以及partition划分的子目录,[topic-name] - [partition-num],数字表示partition的编号,同一个topic下的partition尽可能不要分布在一个broker下;

 1 leo-chen-broker
 2 -> kafka-msg
 3    -> topic_report-0
 4 -> 34477849968.index
 5 -> 34477849968.log
 6 -> 35551592052.index
 7       -> 35551592052.log
 8    -> topic_report-3
 9    -> topic_launch_info-0
10    -> topic_api_call-0
11    -> topic_api_call-1

producer向指定的topic发送msg也就是寻找对应的topic目录,并将一条msg entry添加到文件末尾的过程,一个日志文件由*.index和*.log组成,前者为msg的位置索引,后者是msg自己,这样的存储设计有以下优点:
#1 segment file的分段存储方式方便独立加载,检索和删除数据;
#2 独立存储索引信息*.index的方式能够避免冗余IO操做,快速定位数据;

 

下图是一个topic下一个partition log的逻辑抽象图,全部的partition log files都以topic name做为根目录,该broker上存储的关于该topic的日志文件都位于此目录下;每一条日志由三个部分组成,8个字节的offset用于惟一标记该msg,4个字节的num表示该msg的总长度,n个字节的content表示消费内容,其余就是一些版本和校验字段;每个segment file由一个offset区间段的msg组成,文件名是该区间最小的offset,所以获取消息时指定一个起始offset和maximum chunk size就能够定位目标msg;将msg分文件存储的一个好处就是若是指定了过时时间,则删除过时的msg就只须要简单删除独立的文件。

 

另外kafka容许给每一个partition设置一个或多个replication,但只有一个partition会做为leader对外同时提供读写服务,其余的replication仅做为备份,他们之间的关系由zookeeper进行维护;当当前的partition leader下线后,zookeeper维护的临时节点会由于session失效而自动删除,所以其余的follower能够竞争成为新的leader,实现故障转移;kafka会为每一个partition维护一个a set of in-sync replicas的列表(ISR,总数为n的集群里只要有1个节点存活就能正常工做,不一样于zookeeper的majority vote策略,须要总数为2n+1的集群里至少有n+1个节点存活才能正常工做,优点在于系统的latency取决于吞吐量最快的node),存储全部replication中与以前的partition leader同步状态保持一致的节点,而新的leader也将从这个列表里诞生,而不须要通过投票过程产生新的leader。

Replication与partition leader同步的过程相似于consumer消费msg的过程,也是顺序批量的将partition leader上的消息pull到本地;而kafka会经过两个状态值断定一个replication的健康状态,一个是replication与zookeeper的heartbeat,另外一个是partition与replication上最大offset的差值,只有知足两个条件的replication才会被加入ISR;一个msg只有在全部的replication上都进行同步以后,msg的状态才设置为committed,表示这条msg可被consumer消费。

producer使用batch或者async的方式向broker发送消息
producer须要将msg发送到broker以前,会先为msg指定一个partitionKey,并经过可自定义的hash算法获取一个partition refer,而后向zookeeper获取对应partition的host:port;经过这样的方式保证在一个topic下,全部标记为partitionKey的msg都会被发送到同一个partition上;为了提高性能,producer能够当msg累积到必定量以后统一将一批消息发送到broker,能够是累积消息数量,时间间隔,或者累积msg数据大小;Kafka支持gzip,snappy等多种数据压缩方式。

另外因为kafka会为每一个leader partition提供一个或者多个replication以保证容错性,所以leader partition在收到msg以后会将数据同步到其余的replication上, producer能够经过设置acks参数(0|1|-1)要求同步\异步等待成功被同步了msg的replication的数量。一个msg只有在对应的全部replication上都sync以后才会在partition leader上被标注为committed状态,表示能够被producer消费。

consumer基于consumer group和offset从broker取出msg
kafka定义一个 topic能够被多个CG(Consumer Group)监听消费,CG内的consumer能够消费多个不一样的partitions,但对于一个topic内的partitions,一个CG下的consumers只能消费不一样的partitions,也就是对于一个partition而言禁止同一个CG内的consumer进行并发访问,这样能够最小的代价保证一个CG内的msg消费顺序。

所以若是想实现topic内的消息广播(一个msg被全部consumer消费)则为每个consumer都建立一个CG,若是想实现topic内的消息单播(一个msg只被一个consumer消费)则全部的consumer都放到一个CG里。通常状况下必定是一个CG处理一个topic下全部的partition的全部msg,为了达到最优效率,一个topic下全部partition的数量须要大于等于一个CG内全部consumer的数量(尽可能让全部consumer都处理工做状态),同时也要大于全部broker的数量以便于均衡分配到不一样的broker上。

一个msg是否被消费是经过partition上msg队列中的位置(offset)决定,所以consumer能够经过修改offset的值从而读取partition上任意位置的msg,因为这个offset是交由consumers进行维护,若是是多个CG消费同一个partition的msg的话,须要各自维护本身的offset,所以不存在锁的竞争,而且经过简单增长broker的数量就能够提高访问并发量,经过配置producer.property能够将offset存储于zookeeper,或者producer本身维护 。

kafka提供三种级别的msg消费一致性语义
#1 at most once:fetch msg, update offset, consume msg,因为更新offset是在处理msg以前,因此可能出现msg丢失的场景。
#2 at least once:fetch msg, consume msg, update offset,因为更新offset是在处理msg以后,因此可能出现msg被重复消费的场景,kafka推荐配置。
#3 exactly once:在at least once的基础上,在处理msg以前添加一个接口幂等性断定,或者基于2阶段提交。

Kafka如何保证消息机制的可靠性
消息系统中因为参与方较多以及网络延迟等问题,须要保证几个点,
#1 保证msg发送成功:producer会同步等待broker的返回,并确认replication同步的结果,以保证消息成功被多个broker保存;但若是设置为等待全部replication都同步才返回的话会极大下降producer的吞吐量。

#2 保证msg的消费顺序与发送顺序一致:kafka能够保证一个broker下一个partition接受到的msg能够依次发送到consumer,但须要处理几个常见的问题:
一个问题是因为网络缘由致使先发送的msg晚于后发送的msg到达broker/consumer,这样的问题能够经过producer在msg上添加version,并在consumer方按照version的前后顺序进行消费。
另一个问题就是当一个broker下线以后,即便对应的partition在其余broker上有replication能够支持故障转移,但因为partition leader被新的replication替代,CG针对原来partition锁记录的offset再也不可用,也就是再也不能保证当前msg的消费顺序。

#3 保证msg被成功消费后再也不重复消费:在at most once/at least once/exactly once中,kafka使用的是at least once,所以msg有可能被重复消费,而exactly once能够保证一条消息有且只有一次消费过程,能够在at least once的基础上在producer端添加幂等性断定,因为不一样msgId可能表示同一个业务消息,所以还须要从业务层面定制一个全局惟一性的标识 。

相关文章
相关标签/搜索