Kafka是由LinkedIn开发的一个分布式的消息系统,使用Scala编写,它以可水平扩展和高吞吐率而被普遍使用。目前愈来愈多的开源分布式处理系统如Cloudera、Apache Storm、Spark都支持与Kafka集成。InfoQ一直在紧密关注Kafka的应用以及发展,“Kafka剖析”专栏将会从架构设计、实现、应用场景、性能等方面深度解析Kafka。前端
Kafka建立背景web
Kafka是一个消息系统,本来开发自LinkedIn,用做LinkedIn的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础。如今它已被多家不一样类型的公司 做为多种类型的数据管道和消息系统使用。数据库
活动流数据是几乎全部站点在对其网站使用状况作报表时都要用到的数据中最常规的部分。活动数据包括页面访问量(Page View)、被查看内容方面的信息以及搜索状况等内容。这种数据一般的处理方式是先把各类活动以日志的形式写入某种文件,而后周期性地对这些文件进行统计分析。运营数据指的是服务器的性能数据(CPU、IO使用率、请求时间、服务日志等等数据)。运营数据的统计方法种类繁多。安全
近年来,活动和运营数据处理已经成为了网站软件产品特性中一个相当重要的组成部分,这就须要一套稍微更加复杂的基础设施对其提供支持。服务器
Kafka简介网络
Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标以下:架构
一、以时间复杂度为O(1)的方式提供消息持久化能力,即便对TB级以上数据也能保证常数时间复杂度的访问性能。
二、高吞吐率。即便在很是廉价的商用机器上也能作到单机支持每秒100K条以上消息的传输。
三、支持Kafka Server间的消息分区,及分布式消费,同时保证每一个Partition内的消息顺序传输。
四、同时支持离线数据处理和实时数据处理。
五、Scale out:支持在线水平扩展。app
为什么使用消息系统负载均衡
一、解耦
在项目启动之初来预测未来项目会碰到什么需求,是极其困难的。消息系统在处理过程当中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这容许你独立的扩展或修改两边的处理过程,只要确保它们遵照一样的接口约束。框架
二、冗余
有些状况下,处理数据的过程会失败。除非数据被持久化,不然将形成丢失。消息队列把数据进行持久化直到它们已经被彻底处理,经过这一方式规避了数据丢失风险。许多消息队列所采用的"插入-获取-删除"范式中,在把一个消息从队列中删除以前,须要你的处理系统明确的指出该消息已经被处理完毕,从而确保你的数据被安全的保存直到你使用完毕。
三、扩展性
由于消息队列解耦了你的处理过程,因此增大消息入队和处理的频率是很容易的,只要另外增长处理过程便可。不须要改变代码、不须要调节参数。扩展就像调大电力按钮同样简单。
四、灵活性 & 峰值处理能力
在访问量剧增的状况下,应用仍然须要继续发挥做用,可是这样的突发流量并不常见;若是为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列可以使关键组件顶住突发的访问压力,而不会由于突发的超负荷的请求而彻底崩溃。
五、可恢复性
系统的一部分组件失效时,不会影响到整个系统。消息队列下降了进程间的耦合度,因此即便一个处理消息的进程挂掉,加入队列中的消息仍然能够在系统恢复后被处理。
六、序保证
在大多使用场景下,数据处理的顺序都很重要。大部分消息队列原本就是排序的,而且能保证数据会按照特定的顺序来处理。Kafka保证一个Partition内的消息的有序性。
七、缓冲
在任何重要的系统中,都会有须要不一样的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列经过一个缓冲层来帮助任务最高效率的执行———写入队列的处理会尽量的快速。该缓冲有助于控制和优化数据流通过系统的速度。
八、异步通讯
不少时候,用户不想也不须要当即处理消息。消息队列提供了异步处理机制,容许用户把一个消息放入队列,但并不当即处理它。想向队列中放入多少消息就放多少,而后在须要的时候再去处理它们。
经常使用Message Queue对比
一、RabbitMQ
RabbitMQ是使用Erlang编写的一个开源的消息队列,自己支持不少的协议:AMQP,XMPP, SMTP, STOMP,也正因如此,它很是重量级,更适合于企业级的开发。同时实现了Broker构架,这意味着消息在发送给客户端时先在中心队列排队。对路由,负载均衡或者数据持久化都有很好的支持。
二、Redis
Redis是一个基于Key-Value对的NoSQL数据库,开发维护很活跃。虽然它是一个Key-Value数据库存储系统,但它自己支持MQ功能,因此彻底能够当作一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操做,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不一样大小的数据。实验代表:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而若是数据大小超过了10K,Redis则慢的没法忍受;出队时,不管数据大小,Redis都表现出很是好的性能,而RabbitMQ的出队性能则远低于Redis。
三、ZeroMQ
ZeroMQ号称最快的消息队列系统,尤为针对大吞吐量的需求场景。ZeroMQ可以实现RabbitMQ不擅长的高级/复杂的队列,可是开发人员须要本身组合多种技术框架,技术上的复杂度是对这MQ可以应用成功的挑战。ZeroMQ具备一个独特的非中间件的模式,你不须要安装和运行一个消息服务器或中间件,由于你的应用程序将扮演这个服务器角色。你只须要简单的引用ZeroMQ程序库,可使用NuGet安装,而后你就能够愉快的在应用程序之间发送消息了。可是ZeroMQ仅提供非持久性的队列,也就是说若是宕机,数据将会丢失。其中,Twitter的Storm 0.9.0之前的版本中默认使用ZeroMQ做为数据流的传输(Storm从0.9版本开始同时支持ZeroMQ和Netty做为传输模块)。
四、ActiveMQ
ActiveMQ是Apache下的一个子项目。 相似于ZeroMQ,它可以以代理人和点对点的技术实现队列。同时相似于RabbitMQ,它少许代码就能够高效地实现高级应用场景。
五、Kafka/Jafka
Kafka是Apache下的一个子项目,是一个高性能跨语言分布式发布/订阅消息队列系统,而Jafka是在Kafka之上孵化而来的,即Kafka的一个升级版。具备如下特性:快速持久化,能够在O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既能够达到10W/s的吞吐速率;彻底的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;支持Hadoop数据并行加载,对于像Hadoop的同样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka经过Hadoop的并行加载机制统一了在线和离线的消息处理。Apache Kafka相对于ActiveMQ是一个很是轻量级的消息系统,除了性能很是好以外,仍是一个工做良好的分布式系统。
Terminology
一、Broker
Kafka集群包含一个或多个服务器,这种服务器被称为broker
二、Topic
每条发布到Kafka集群的消息都有一个类别,这个类别被称为Topic。(物理上不一样Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个broker上但用户只需指定消息的Topic便可生产或消费数据而没必要关心数据存于何处)
三、Partition
Parition是物理上的概念,每一个Topic包含一个或多个Partition.
四、Producer
负责发布消息到Kafka broker
五、Consumer
消息消费者,向Kafka broker读取消息的客户端。
六、Consumer Group
每一个Consumer属于一个特定的Consumer Group(可为每一个Consumer指定group name,若不指定group name则属于默认的group)。
Kafka拓扑结构
如上图所示,一个典型的Kafka集群中包含若干Producer(能够是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,通常broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka经过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。
Topic & Partition
Topic在逻辑上能够被认为是一个queue,每条消费都必须指定它的Topic,能够简单理解为必须指明把这条消息放进哪一个queue里。为了使得Kafka的吞吐率能够线性提升,物理上把Topic分红一个或多个Partition,每一个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的全部消息和索引文件。若建立topic1和topic2两个topic,且分别有13个和19个分区,则整个集群上会相应会生成共32个文件夹(本文所用集群共8个节点,此处topic1和topic2 replication-factor均为1),以下图所示。
每一个日志文件都是一个log entrie序列,每一个log entrie包含一个4字节整型数值(值为N+5),1个字节的"magic value",4个字节的CRC校验码,其后跟N个字节的消息体。每条消息都有一个当前Partition下惟一的64字节的offset,它指明了这条消息的起始位置。磁盘上存储的消息格式以下:
1 |
message length : 4 bytes (value : 1 + 4 +n) |
2 |
"magic" value : 1 byte |
3 |
crc : 4 bytes |
4 |
payload : n bytes |
这个log entries并不是由一个文件构成,而是分红多个segment,每一个segment以该segment第一条消息的offset命名并以“.kafka”为后缀。另外会有一个索引文件,它标明了每一个segment下包含的log entry的offset范围,以下图所示:
由于每条消息都被append到该Partition中,属于顺序写磁盘,所以效率很是高(经验证,顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证)。
对于传统的message queue而言,通常会删除已经被消费的消息,而Kafka集群会保留全部的消息,不管其被消费与否。固然,由于磁盘限制,不可能永久保留全部数据(实际上也不必),所以Kafka提供两种策略删除旧数据。一是基于时间,二是基于Partition文件大小。例如能够经过配置$KAFKA_HOME/config/server.properties,让Kafka删除一周前的数据,也可在Partition文件超过1GB时删除旧数据,配置以下所示。
01 |
# The minimum age of a log file to be eligible for deletion |
02 |
log.retention.hours = 168 |
03 |
04 |
# The maximum size of a log segment file. When this size is |
05 |
# reached a new log segment will be created. |
06 |
log.segment.bytes = 1073741824 |
07 |
08 |
# The interval at which log segments are checked to see if they |
09 |
# can be deleted according to the retention policies |
10 |
log.retention.check.interval.ms = 300000 |
11 |
12 |
# If log.cleaner.enable = true is set the cleaner will be enabled |
13 |
# and individual logs can then be marked for log compaction. |
14 |
log.cleaner.enable = false |
这里要注意,由于Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,因此这里删除过时文件与提升Kafka性能无关。选择怎样的删除策略只与磁盘以及具体的需求有关。另外,Kafka会为每个Consumer Group保留一些metadata信息——当前消费的消息的position,也即offset。这个offset由Consumer控制。正常状况下Consumer会在消费完一条消息后递增该offset。固然,Consumer也可将offset设成一个较小的值,从新消费一些消息。由于offet由Consumer控制,因此Kafka broker是无状态的,它不须要标记哪些消息被哪些消费过,也不须要经过broker去保证同一个Consumer Group只有一个Consumer能消费某一条消息,所以也就不须要锁机制,这也为Kafka的高吞吐率提供了有力保障。
Producer消息路由
Producer发送消息到broker时,会根据Paritition机制选择将其存储到哪个Partition。若是Partition机制设置合理,全部消息能够均匀分布到不一样的Partition里,这样就实现了负载均衡。若是一个Topic对应一个文件,那这个文件所在的机器I/O将会成为这个Topic的性能瓶颈,而有了Partition后,不一样的消息能够并行写入不一样broker的不一样Partition里,极大的提升了吞吐率。能够在$KAFKA_HOME/config/server.properties中经过配置项num.partitions来指定新建Topic的默认Partition数量,也可在建立Topic时经过参数指定,同时也能够在Topic建立以后经过Kafka提供的工具修改。
在发送一条消息时,能够指定这条消息的key,Producer根据这个key和Partition机制来判断应该将这条消息发送到哪一个Parition。Paritition机制能够经过指定Producer的paritition. class这一参数来指定,该class必须实现kafka.producer.Partitioner接口。本例中若是key能够被解析为整数则将对应的整数与Partition总数取余,该消息会被发送到该数对应的Partition。(每一个Parition都会有个序号,序号从0开始)
01 |
import kafka.producer.Partitioner; |
02 |
import kafka.utils.VerifiableProperties; |
03 |
04 |
public class JasonPartitioner<T> implements Partitioner { |
05 |
06 |
public JasonPartitioner(VerifiableProperties verifiableProperties) {} |
07 |
08 |
@Override |
09 |
public int partition(Object key, int numPartitions) { |
10 |
try { |
11 |
int partitionNum = Integer.parseInt((String) key); |
12 |
return Math.abs(Integer.parseInt((String) key) % numPartitions); |
13 |
} catch (Exception e) { |
14 |
return Math.abs(key.hashCode() % numPartitions); |
15 |
} |
16 |
} |
17 |
} |
若是将上例中的类做为partition.class,并经过以下代码发送20条消息(key分别为0,1,2,3)至topic3(包含4个Partition)。
01 |
public void sendMessage() throws InterruptedException{ |
02 |
for ( int i = 1 ; i <= 5 ; i++){ |
03 |
List messageList = new ArrayList<KeyedMessage<String, String>>(); |
04 |
for ( int j = 0 ; j < 4 ; j++){ |
05 |
messageList.add( new KeyedMessage<String, String>( "topic2" , j+ "" , |
06 |
"The " + i + " message for key " + j)); |
07 |
} |
08 |
producer.send(messageList); |
09 |
} |
10 |
producer.close(); |
11 |
} |
则key相同的消息会被发送并存储到同一个partition里,并且key的序号正好和Partition序号相同。(Partition序号从0开始,本例中的key也从0开始)。下图所示是经过Java程序调用Consumer后打印出的消息列表。
Consumer Group
(本节全部描述都是基于Consumer hight level API而非low level API)。使用Consumer high level API时,同一Topic的一条消息只能被同一个Consumer Group内的一个Consumer消费,但多个Consumer Group可同时消费这一消息。
这是Kafka用来实现一个Topic消息的广播(发给全部的Consumer)和单播(发给某一个Consumer)的手段。一个Topic能够对应多个Consumer Group。若是须要实现广播,只要每一个Consumer有一个独立的Group就能够了。要实现单播只要全部的Consumer在同一个Group里。用Consumer Group还能够将Consumer进行自由的分组而不须要屡次发送消息到不一样的Topic。
实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可使用Storm这种实时流处理系统对消息进行实时在线处理,同时使用Hadoop这种批处理系统进行离线处理,还能够同时将数据实时备份到另外一个数据中心,只须要保证这三个操做所使用的Consumer属于不一样的Consumer Group便可。下图是Kafka在Linkedin的一种简化部署示意图。
下面这个例子更清晰地展现了Kafka Consumer Group的特性。首先建立一个Topic (名为topic1,包含3个Partition),而后建立一个属于group1的Consumer实例,并建立三个属于group2的Consumer实例,最后经过Producer向topic1发送key分别为1,2,3的消息。结果发现属于group1的Consumer收到了全部的这三条消息,同时group2中的3个Consumer分别收到了key为1,2,3的消息。以下图所示。
Push vs. Pull
做为一个消息系统,Kafka遵循了传统的方式,选择由Producer向broker push消息并由Consumer从broker pull消息。一些logging-centric system,好比Facebook的Scribe和Cloudera的Flume,采用push模式。事实上,push模式和pull模式各有优劣。
push模式很难适应消费速率不一样的消费者,由于消息发送速率是由broker决定的。push模式的目标是尽量以最快速度传递消息,可是这样很容易形成Consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则能够根据Consumer的消费能力以适当的速率消费消息。
对于Kafka而言,pull模式更合适。pull模式可简化broker的设计,Consumer可自主控制消费消息的速率,同时Consumer能够本身控制消费方式——便可批量消费也可逐条消费,同时还能选择不一样的提交方式从而实现不一样的传输语义。
Kafka delivery guarantee
有这么几种可能的delivery guarantee:
一、At most once 消息可能会丢,但毫不会重复传输
二、At least one 消息毫不会丢,但可能会重复传输
三、Exactly once 每条消息确定会被传输一次且仅传输一次,不少时候这是用户所想要的。
当Producer向broker发送消息时,一旦这条消息被commit,因数replication的存在,它就不会丢。可是若是Producer发送数据给broker后,遇到网络问题而形成通讯中断,那Producer就没法判断该条消息是否已经commit。虽然Kafka没法肯定网络故障期间发生了什么,可是Producer能够生成一种相似于主键的东西,发生故障时幂等性的重试屡次,这样就作到了Exactly once。截止到目前(Kafka 0.8.2版本,2015-03-04),这一Feature还并未实现,有但愿在Kafka将来的版本中实现。(因此目前默认状况下一条消息从Producer到broker是确保了At least once,可经过设置Producer异步发送实现At most once)。
接下来讨论的是消息从broker到Consumer的delivery guarantee语义。(仅针对Kafka consumer high level API)。Consumer在从broker读取消息后,能够选择commit,该操做会在Zookeeper中保存该Consumer在该Partition中读取的消息的offset。该Consumer下一次再读该Partition时会从下一条开始读取。如未commit,下一次读取的开始位置会跟上一次commit以后的开始位置相同。固然能够将Consumer设置为autocommit,即Consumer一旦读到数据当即自动commit。若是只讨论这一读取消息的过程,那Kafka是确保了Exactly once。但实际使用中应用程序并不是在Consumer读取完数据就结束了,而是要进行进一步处理,而数据处理与commit的顺序在很大程度上决定了消息从broker和consumer的delivery guarantee semantic。
四、读完消息先commit再处理消息。这种模式下,若是Consumer在commit后还没来得及处理消息就crash了,下次从新开始工做后就没法读到刚刚已提交而未处理的消息,这就对应于At most once
五、读完消息先处理再commit。这种模式下,若是在处理完消息以后commit以前Consumer crash了,下次从新开始工做时还会处理刚刚未commit的消息,实际上该消息已经被处理过了。这就对应于At least once。在不少使用场景下,消息都有一个主键,因此消息的处理每每具备幂等性,即屡次处理这一条消息跟只处理一次是等效的,那就能够认为是Exactly once。(笔者认为这种说法比较牵强,毕竟它不是Kafka自己提供的机制,主键自己也并不能彻底保证操做的幂等性。并且实际上咱们说delivery guarantee 语义是讨论被处理多少次,而非处理结果怎样,由于处理方式多种多样,咱们不该该把处理过程的特性——如是否幂等性,当成Kafka自己的Feature)
六、若是必定要作到Exactly once,就须要协调offset和实际操做的输出。精典的作法是引入两阶段提交。若是能让offset和操做输入存在同一个地方,会更简洁和通用。这种方式可能更好,由于许多输出系统可能不支持两阶段提交。好比,Consumer拿到数据后可能把数据放到HDFS,若是把最新的offset和数据自己一块儿写到HDFS,那就能够保证数据的输出和offset的更新要么都完成,要么都不完成,间接实现Exactly once。(目前就high level API而言,offset是存于Zookeeper中的,没法存于HDFS,而low level API的offset是由本身去维护的,能够将之存于HDFS中)
总之,Kafka默认保证At least once,而且容许经过设置Producer异步提交来实现At most once。而Exactly once要求与外部存储系统协做,幸运的是Kafka提供的offset能够很是直接很是容易得使用这种方式。