Kafka是最初由Linkedin公司开发,是一个分布式、支持分区的(partition)、多副本的(replica),基于zookeeper协调的分布式消息系统,它的最大的特性就是能够实时的处理大量数据以知足各类需求场景:好比基于hadoop的批处理系统、低延迟的实时系统、storm/Spark流式处理引擎,web/nginx日志、访问日志,消息服务等等,用scala语言编写,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。前端
Redis |
|
RabbitMQ |
|
ZeroMQ |
|
ActiveMQ |
|
Kafka/Jafka |
|
Kafka主要用途是数据集成,或者说是流数据集成,以Pub/Sub形式的消息总线形式提供。可是,Kafka不只仅是一套传统的消息总线,本质上Kafka是分布式的流数据平台,由于如下特性而著名:node
经常使用场景:mysql
Message(消息):传递的数据对象,主要由四部分构成:offset(偏移量)、key、value、timestamp(插入时间); 其中offset和timestamp在kafka集群中产生,key/value在producer发送数据的时候产生Broker(代理者):Kafka集群中的机器/服务被成为broker, 是一个物理概念。linux
Topic(主题):维护Kafka上的消息类型被称为Topic,是一个逻辑概念。nginx
Partition(分区):具体维护Kafka上的消息数据的最小单位,一个Topic能够包含多个分区;Partition特性:web
ordered & immutable。(在数据的产生和消费过程当中,不须要关注数据具体存储的Partition在那个Broker上,只须要指定Topic便可,由Kafka负责将数据和对应的Partition关联上)算法
Producer(生产者):负责将数据发送到Kafka对应Topic的进程sql
Consumergroup:各个consumer(consumer 线程)能够组成一个组(Consumer group ),partition中的每一个message只能被组(Consumer group )中的一个consumer(consumer 线程)消费,若是一个message能够被多个consumer(consumer 线程)消费的话,那么这些consumer必须在不一样的组。Kafka不支持一个partition中的message由两个或两个以上的同一个consumer group下的consumer thread来处理,除非再启动一个新的consumer group。因此若是想同时对一个topic作消费的话,启动多个consumer group就能够了,可是要注意的是,这里的多个consumer的消费都必须是顺序读取partition里面的message,新启动的consumer默认从partition队列最头端最新的地方开始阻塞的读message。它不能像AMQ那样能够多个BET做为consumer去互斥的(for update悲观锁)并发处理message,这是由于多个BET去消费一个Queue中的数据的时候,因为要保证不能多个线程拿同一条message,因此就须要行级别悲观所(for update),这就致使了consume的性能降低,吞吐量不够。而kafka为了保证吞吐量,只容许同一个consumer group下的一个consumer线程去访问一个partition。若是以为效率不高的时候,能够加partition的数量来横向扩展,那么再加新的consumer thread去消费。若是想多个不一样的业务都须要这个topic的数据,起多个consumer group就行了,你们都是顺序的读取message,offsite的值互不影响。这样没有锁竞争,充分发挥了横向的扩展性,吞吐量极高。这也就造成了分布式消费的概念。数据库
当启动一个consumer group去消费一个topic的时候,不管topic里面有多个少个partition,不管咱们consumer group里面配置了多少个consumer thread,这个consumer group下面的全部consumer thread必定会消费所有的partition;即使这个consumer group下只有一个consumer thread,那么这个consumer thread也会去消费全部的partition。所以,最优的设计就是,consumer group下的consumer thread的数量等于partition数量,这样效率是最高的。api
同一partition的一条message只能被同一个Consumer Group内的一个Consumer消费。不可以一个consumer group的多个consumer同时消费一个partition。
一个consumer group下,不管有多少个consumer,这个consumer group必定回去把这个topic下全部的partition都消费了。当consumer group里面的consumer数量小于这个topic下的partition数量的时候,以下图groupA,groupB,就会出现一个conusmer thread消费多个partition的状况,总之是这个topic下的partition都会被消费。若是consumer group里面的consumer数量等于这个topic下的partition数量的时候,以下图groupC,此时效率是最高的,每一个partition都有一个consumer thread去消费。当consumer group里面的consumer数量大于这个topic下的partition数量的时候,以下图GroupD,就会有一个consumer thread空闲。所以,咱们在设定consumer group的时候,只须要指明里面有几个consumer数量便可,无需指定对应的消费partition序号,consumer会自动进行rebalance。
多个Consumer Group下的consumer能够消费同一条message,可是这种消费也是以o(1)的方式顺序的读取message去消费,,因此必定会重复消费这批message的,不能向AMQ那样多个BET做为consumer消费(对message加锁,消费的时候不能重复消费message)
Consumer: Consumer处理partition里面的message的时候是o(1)顺序读取的。因此必须维护着上一次读到哪里的offsite信息。high level API,offset存于Zookeeper中,low level API的offset由本身维护。通常来讲都是使用high level api的。Consumer的delivery gurarantee,默认是读完message先commmit再处理message,autocommit默认是true,这时候先commit就会更新offsite+1,一旦处理失败,offsite已经+1,这个时候就会丢message;也能够配置成读完消息处理再commit,这种状况下consumer端的响应就会比较慢的,须要等处理完才行。
通常状况下,必定是一个consumer group处理一个topic的message。Best Practice是这个consumer group里面consumer的数量等于topic里面partition的数量,这样效率是最高的,一个consumer thread处理一个partition。若是这个consumer group里面consumer的数量小于topic里面partition的数量,就会有consumer thread同时处理多个partition(这个是kafka自动的机制,咱们不用指定),可是总之这个topic里面的全部partition都会被处理到的。。若是这个consumer group里面consumer的数量大于topic里面partition的数量,多出的consumer thread就会闲着啥也不干,剩下的是一个consumer thread处理一个partition,这就形成了资源的浪费,由于一个partition不可能被两个consumer thread去处理。因此咱们线上的分布式多个service服务,每一个service里面的kafka consumer数量都小于对应的topic的partition数量,可是全部服务的consumer数量只和等于partition的数量,这是由于分布式service服务的全部consumer都来自一个consumer group,若是来自不一样的consumer group就会处理重复的message了(同一个consumer group下的consumer不能处理同一个partition,不一样的consumer group能够处理同一个topic,那么都是顺序处理message,必定会处理重复的。通常这种状况都是两个不一样的业务逻辑,才会启动两个consumer group来处理一个topic)。
官网的图解能够直观看出消费概览
须要注意以下几点:
1)一组(类)消息一般由某个topic来归类,咱们能够把这组消息“分发”给若干个分区(partition),每一个分区的消息各不相同;
2)每一个分区都维护着他本身的偏移量(Offset),记录着该分区的消息此时被消费的位置;
3)一个消费线程能够对应若干个分区,但一个分区只能被具体某一个消费线程消费;
4)group.id用于标记某一个消费组,每个消费组都会被记录他在某一个分区的Offset,即不一样consumer group针对同一个分区,都有“各自”的偏移量。
一个消息如何算投递成功,Kafka提供了三种模式:
- 第一种是啥都无论,发送出去就看成成功,这种状况固然不能保证消息成功投递到broker;
- 第二种是Master-Slave模型,只有当Master和全部Slave都接收到消息时,才算投递成功,这种模型提供了最高的投递可靠性,可是损伤了性能;
- 第三种模型,即只要Master确认收到消息就算投递成功;实际使用时,根据应用特性选择,绝大多数状况下都会中和可靠性和性能选择第三种模型
消息在broker上的可靠性,由于消息会持久化到磁盘上,因此若是正常stop一个broker,其上的数据不会丢失;可是若是不正常stop,可能会使存在页面缓存来不及写入磁盘的消息丢失,这能够经过配置flush页面缓存的周期、阈值缓解,可是一样会频繁的写磁盘会影响性能,又是一个选择题,根据实际状况配置。
消息消费的可靠性,Kafka提供的是“At least once”模型,由于消息的读取进度由offset提供,offset能够由消费者本身维护也能够维护在zookeeper里,可是当消息消费后consumer挂掉,offset没有即时写回,就有可能发生重复读的状况,这种状况一样能够经过调整commit offset周期、阈值缓解,甚至消费者本身把消费和commit offset作成一个事务解决,可是若是你的应用不在意重复消费,那就干脆不要解决,以换取最大的性能。
- Partition ack:当ack=1,表示producer写partition leader成功后,broker就返回成功,不管其余的partition follower是否写成功。当ack=2,表示producer写partition leader和其余一个follower成功的时候,broker就返回成功,不管其余的partition follower是否写成功。当ack=-1[parition的数量]的时候,表示只有producer所有写成功的时候,才算成功,kafka broker才返回成功信息。这里须要注意的是,若是ack=1的时候,一旦有个broker宕机致使partition的follower和leader切换,会致使丢数据。
分析过程分为如下4个步骤:
经过上述4过程详细分析,咱们就能够清楚认识到kafka文件存储机制的奥秘。
kafka leader
Kakfa Broker集群受Zookeeper管理。全部的Kafka Broker节点一块儿去Zookeeper上注册一个临时节点,由于只有一个Kafka Broker会注册成功,其余的都会失败,因此这个成功在Zookeeper上注册临时节点的这个Kafka Broker会成为Kafka Broker Controller,其余的Kafka broker叫Kafka Broker follower。(这个过程叫Controller在ZooKeeper注册Watch)。这个Controller会监听其余的Kafka Broker的全部信息,若是这个kafka broker controller宕机了,在zookeeper上面的那个临时节点就会消失,此时全部的kafka broker又会一块儿去Zookeeper上注册一个临时节点,由于只有一个Kafka Broker会注册成功,其余的都会失败,因此这个成功在Zookeeper上注册临时节点的这个Kafka Broker会成为Kafka Broker Controller,其余的Kafka broker叫Kafka Broker follower。例如:一旦有一个broker宕机了,这个kafka broker controller会读取该宕机broker上全部的partition在zookeeper上的状态,并选取ISR列表中的一个replica做为partition leader(若是ISR列表中的replica全挂,选一个幸存的replica做为leader; 若是该partition的全部的replica都宕机了,则将新的leader设置为-1,等待恢复,等待ISR中的任一个Replica“活”过来,而且选它做为Leader;或选择第一个“活”过来的Replica(不必定是ISR中的)做为Leader),这个broker宕机的事情,kafka controller也会通知zookeeper,zookeeper就会通知其余的kafka broker。
Kafka的核心是日志文件,日志文件在集群中的同步是分布式数据系统最基础的要素。
若是leaders永远不会down的话咱们就不须要followers了!一旦leader down掉了,须要在followers中选择一个新的leader.可是followers自己有可能延时过久或者crash,因此必须选择高质量的follower做为leader.必须保证,一旦一个消息被提交了,可是leader down掉了,新选出的leader必须能够提供这条消息。大部分的分布式系统采用了多数投票法则选择新的leader,对于多数投票法则,就是根据全部副本节点的情况动态的选择最适合的做为leader.Kafka并非使用这种方法。
Kafka动态维护了一个同步状态的副本的集合(a set of in-sync replicas),简称ISR,在这个集合中的节点都是和leader保持高度一致的,任何一条消息必须被这个集合中的每一个节点读取并追加到日志中了,才回通知外部这个消息已经被提交了。所以这个集合中的任何一个节点随时均可以被选为leader.ISR在ZooKeeper中维护。ISR中有f+1个节点,就能够容许在f个节点down掉的状况下不会丢失消息并正常提供服。ISR的成员是动态的,若是一个节点被淘汰了,当它从新达到“同步中”的状态时,他能够从新加入ISR.这种leader的选择方式是很是快速的,适合kafka的应用场景。
一个邪恶的想法:若是全部节点都down掉了怎么办?Kafka对于数据不会丢失的保证,是基于至少一个节点是存活的,一旦全部节点都down了,这个就不能保证了。
实际应用中,当全部的副本都down掉时,必须及时做出反应。能够有如下两种选择:
1. 等待ISR中的任何一个节点恢复并担任leader。
2. 选择全部节点中(不仅是ISR)第一个恢复的节点做为leader.
这是一个在可用性和连续性之间的权衡。若是等待ISR中的节点恢复,一旦ISR中的节点起不起来或者数据都是了,那集群就永远恢复不了了。若是等待ISR意外的节点恢复,这个节点的数据就会被做为线上数据,有可能和真实的数据有所出入,由于有些数据它可能还没同步到。Kafka目前选择了第二种策略,在将来的版本中将使这个策略的选择可配置,能够根据场景灵活的选择。
这种窘境不仅Kafka会遇到,几乎全部的分布式数据系统都会遇到。
分布式
kafka使用zookeeper来存储一些meta信息,并使用了zookeeper watch机制来发现meta信息的变动并做出相应的动做(好比consumer失效,触发负载均衡等)
Broker node registry: 当一个kafka broker启动后,首先会向zookeeper注册本身的节点信息(临时znode),同时当broker和zookeeper断开链接时,此znode也会被删除.
Broker Topic Registry: 当一个broker启动时,会向zookeeper注册本身持有的topic和partitions信息,仍然是一个临时znode.
Consumer and Consumer group: 每一个consumer客户端被建立时,会向zookeeper注册本身的信息;此做用主要是为了"负载均衡".一个group中的多个consumer能够交错的消费一个topic的全部partitions;简而言之,保证此topic的全部partitions都能被此group所消费,且消费时为了性能考虑,让partition相对均衡的分散到每一个consumer上.
Consumer id Registry: 每一个consumer都有一个惟一的ID(host:uuid,能够经过配置文件指定,也能够由系统生成),此id用来标记消费者信息.
Consumer offset Tracking: 用来跟踪每一个consumer目前所消费的partition中最大的offset.此znode为持久节点,能够看出offset跟group_id有关,以代表当group中一个消费者失效,其余consumer能够继续消费.
Partition Owner registry: 用来标记partition正在被哪一个consumer消费.临时znode。此节点表达了"一个partition"只能被group下一个consumer消费,同时当group下某个consumer失效,那么将会触发负载均衡(即:让partitions在多个consumer间均衡消费,接管那些"游离"的partitions)
当consumer启动时,所触发的操做:
A) 首先进行"Consumer id Registry";
B) 而后在"Consumer id Registry"节点下注册一个watch用来监听当前group中其余consumer的"leave"和"join";只要此znode path下节点列表变动,都会触发此group下consumer的负载均衡.(好比一个consumer失效,那么其余consumer接管partitions).
C) 在"Broker id registry"节点下,注册一个watch用来监听broker的存活状况;若是broker列表变动,将会触发全部的groups下的consumer从新balance.
总结:
1) Producer端使用zookeeper用来"发现"broker列表,以及和Topic下每一个partition leader创建socket链接并发送消息.
2) Broker端使用zookeeper用来注册broker信息,已经监测partition leader存活性.
3) Consumer端使用zookeeper用来注册consumer信息,其中包括consumer消费的partition列表等,同时也用来发现broker列表,并和partition leader创建socket链接,并获取消息。
协调机制
1. 管理broker与consumer的动态加入与离开。(Producer不须要管理,随便一台计算机均可以做为Producer向Kakfa Broker发消息)
3. 维护消费关系及每一个partition的消费信息。
Producers
Producers直接发送消息到broker上的leader partition,不须要通过任何中介或其余路由转发。为了实现这个特性,kafka集群中的每一个broker均可以响应producer的请求,并返回topic的一些元信息,这些元信息包括哪些机器是存活的,topic的leader partition都在哪,现阶段哪些leader partition是能够直接被访问的。
Producer客户端本身控制着消息被推送到哪些partition。实现的方式能够是随机分配、实现一类随机负载均衡算法,或者指定一些分区算法。Kafka提供了接口供用户实现自定义的partition,用户能够为每一个消息指定一个partitionKey,经过这个key来实现一些hash分区算法。好比,把userid做为partitionkey的话,相同userid的消息将会被推送到同一个partition。
以Batch的方式推送数据能够极大的提升处理效率,kafka Producer 能够将消息在内存中累计到必定数量后做为一个batch发送请求。Batch的数量大小能够经过Producer的参数控制,参数值能够设置为累计的消息的数量(如500条)、累计的时间间隔(如100ms)或者累计的数据大小(64KB)。经过增长batch的大小,能够减小网络请求和磁盘IO的次数,固然具体参数设置须要在效率和时效性方面作一个权衡。
Producers能够异步的并行的向kafka发送消息,可是一般producer在发送完消息以后会获得一个future响应,返回的是offset值或者发送过程当中遇到的错误。这其中有个很是重要的参数“acks”,这个参数决定了producer要求leader partition 收到确认的副本个数,若是acks设置数量为0,表示producer不会等待broker的响应,因此,producer没法知道消息是否发送成功,这样有可能会致使数据丢失,但同时,acks值为0会获得最大的系统吞吐量。
若acks设置为1,表示producer会在leader partition收到消息时获得broker的一个确认,这样会有更好的可靠性,由于客户端会等待直到broker确认收到消息。若设置为-1,producer会在全部备份的partition收到消息时获得broker的确认,这个设置能够获得最高的可靠性保证。
Kafka 消息有一个定长的header和变长的字节数组组成。由于kafka消息支持字节数组,也就使得kafka能够支持任何用户自定义的序列号格式或者其它已有的格式如Apache Avro、protobuf等。Kafka没有限定单个消息的大小,但咱们推荐消息大小不要超过1MB,一般通常消息大小都在1~10kB以前。
发布消息时,kafka client先构造一条消息,将消息加入到消息集set中(kafka支持批量发布,能够往消息集合中添加多条消息,一次行发布),send消息时,producer client需指定消息所属的topic。
Consumers
Kafka提供了两套consumer api,分为high-level api和sample-api。Sample-api 是一个底层的API,它维持了一个和单一broker的链接,而且这个API是彻底无状态的,每次请求都须要指定offset值,所以,这套API也是最灵活的。
在kafka中,当前读到哪条消息的offset值是由consumer来维护的,所以,consumer能够本身决定如何读取kafka中的数据。好比,consumer能够经过重设offset值来从新消费已消费过的数据。无论有没有被消费,kafka会保存数据一段时间,这个时间周期是可配置的,只有到了过时时间,kafka才会删除这些数据。(这一点与AMQ不同,AMQ的message通常来讲都是持久化到mysql中的,消费完的message会被delete掉)
High-level API封装了对集群中一系列broker的访问,能够透明的消费一个topic。它本身维持了已消费消息的状态,即每次消费的都是下一个消息。
High-level API还支持以组的形式消费topic,若是consumers有同一个组名,那么kafka就至关于一个队列消息服务,而各个consumer均衡的消费相应partition中的数据。若consumers有不一样的组名,那么此时kafka就至关与一个广播服务,会把topic中的全部消息广播到每一个consumer。
High level api和Low level api是针对consumer而言的,和producer无关。
High level api是consumer读的partition的offsite是存在zookeeper上。High level api 会启动另一个线程去每隔一段时间,offsite自动同步到zookeeper上。换句话说,若是使用了High level api, 每一个message只能被读一次,一旦读了这条message以后,不管我consumer的处理是否ok。High level api的另一个线程会自动的把offiste+1同步到zookeeper上。若是consumer读取数据出了问题,offsite也会在zookeeper上同步。所以,若是consumer处理失败了,会继续执行下一条。这每每是不对的行为。所以,Best Practice是一旦consumer处理失败,直接让整个conusmer group抛Exception终止,可是最后读的这一条数据是丢失了,由于在zookeeper里面的offsite已经+1了。等再次启动conusmer group的时候,已经从下一条开始读取处理了。
Low level api是consumer读的partition的offsite在consumer本身的程序中维护。不会同步到zookeeper上。可是为了kafka manager可以方便的监控,通常也会手动的同步到zookeeper上。这样的好处是一旦读取某个message的consumer失败了,这条message的offsite咱们本身维护,咱们不会+1。下次再启动的时候,还会从这个offsite开始读。这样能够作到exactly once对于数据的准确性有保证。
借鉴:http://blog.csdn.net/ychenfeng/article/details/74980531