Kafka是一种分布式的,基于发布/订阅的消息系统。主要设计目标以下:前端
解耦
在项目启动之初来预测未来项目会碰到什么需求,是极其困难的。消息队列在处理过程当中间插入了一个隐含的、基于数据的接口层,两边的处理过程都要实现这一接口。这容许你独立的扩展或修改两边的处理过程,只要确保它们遵照一样的接口约束java
冗余
有些状况下,处理数据的过程会失败。除非数据被持久化,不然将形成丢失。消息队列把数据进行持久化直到它们已经被彻底处理,经过这一方式规避了数据丢失风险。在被许多消息队列所采用的”插入-获取-删除”范式中,在把一个消息从队列中删除以前,须要你的处理过程明确的指出该消息已经被处理完毕,确保你的数据被安全的保存直到你使用完毕。node
扩展性
由于消息队列解耦了你的处理过程,因此增大消息入队和处理的频率是很容易的;只要另外增长处理过程便可。不须要改变代码、不须要调节参数。扩展就像调大电力按钮同样简单。git
灵活性 & 峰值处理能力
在访问量剧增的状况下,应用仍然须要继续发挥做用,可是这样的突发流量并不常见;若是为以能处理这类峰值访问为标准来投入资源随时待命无疑是巨大的浪费。使用消息队列可以使关键组件顶住突发的访问压力,而不会由于突发的超负荷的请求而彻底崩溃。github
可恢复性
当体系的一部分组件失效,不会影响到整个系统。消息队列下降了进程间的耦合度,因此即便一个处理消息的进程挂掉,加入队列中的消息仍然能够在系统恢复后被处理。而这种容许重试或者延后处理请求的能力一般是造就一个略感不便的用户和一个沮丧透顶的用户之间的区别。web
送达保证
消息队列提供的冗余机制保证了消息能被实际的处理,只要一个进程读取了该队列便可。在此基础上,IronMQ提供了一个”只送达一次”保证。不管有多少进程在从队列中领取数据,每个消息只能被处理一次。这之因此成为可能,是由于获取一个消息只是”预约”了这个消息,暂时把它移出了队列。除非客户端明确的表示已经处理完了这个消息,不然这个消息会被放回队列中去,在一段可配置的时间以后可再次被处理。算法
顺序保证
在大多使用场景下,数据处理的顺序都很重要。消息队列原本就是排序的,而且能保证数据会按照特定的顺序来处理。IronMO保证消息经过FIFO(先进先出)的顺序来处理,所以消息在队列中的位置就是从队列中检索他们的位置。数据库
缓冲
在任何重要的系统中,都会有须要不一样的处理时间的元素。例如,加载一张图片比应用过滤器花费更少的时间。消息队列经过一个缓冲层来帮助任务最高效率的执行—写入队列的处理会尽量的快速,而不受从队列读的预备处理的约束。该缓冲有助于控制和优化数据流通过系统的速度。apache
理解数据流
在一个分布式系统里,要获得一个关于用户操做会用多长时间及其缘由的整体印象,是个巨大的挑战。消息队列经过消息被处理的频率,来方便的辅助肯定那些表现不佳的处理过程或领域,这些地方的数据流都不够优化。安全
异步通讯
不少时候,你不想也不须要当即处理消息。消息队列提供了异步处理机制,容许你把一个消息放入队列,但并不当即处理它。你想向队列中放入多少消息就放多少,而后在你乐意的时候再去处理它们。
RabbitMQ
RabbitMQ是使用Erlang编写的一个开源的消息队列,自己支持不少的协议:AMQP,XMPP, SMTP, STOMP,也正因如此,它很是重量级,更适合于企业级的开发。同时实现了Broker构架,这意味着消息在发送给客户端时先在中心队列排队。对路由,负载均衡或者数据持久化都有很好的支持。
Redis
Redis是一个基于Key-Value对的NoSQL数据库,开发维护很活跃。虽然它是一个Key-Value数据库存储系统,但它自己支持MQ功能,因此彻底能够当作一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操做,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不一样大小的数据。实验代表:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而若是数据大小超过了10K,Redis则慢的没法忍受;出队时,不管数据大小,Redis都表现出很是好的性能,而RabbitMQ的出队性能则远低于Redis。
ZeroMQ
ZeroMQ号称最快的消息队列系统,尤为针对大吞吐量的需求场景。ZMQ可以实现RabbitMQ不擅长的高级/复杂的队列,可是开发人员须要本身组合多种技术框架,技术上的复杂度是对这MQ可以应用成功的挑战。ZeroMQ具备一个独特的非中间件的模式,你不须要安装和运行一个消息服务器或中间件,由于你的应用程序将扮演了这个服务角色。你只须要简单的引用ZeroMQ程序库,可使用NuGet安装,而后你就能够愉快的在应用程序之间发送消息了。可是ZeroMQ仅提供非持久性的队列,也就是说若是宕机,数据将会丢失。其中,Twitter的Storm 0.9.0之前的版本中默认使用ZeroMQ做为数据流的传输(Storm从0.9版本开始同时支持ZeroMQ和Netty做为传输模块)。
ActiveMQ
ActiveMQ是Apache下的一个子项目。 相似于ZeroMQ,它可以以代理人和点对点的技术实现队列。同时相似于RabbitMQ,它少许代码就能够高效地实现高级应用场景。
Kafka/Jafka
Kafka是Apache下的一个子项目,是一个高性能跨语言分布式发布/订阅消息队列系统,而Jafka是在Kafka之上孵化而来的,即Kafka的一个升级版。具备如下特性:快速持久化,能够在O(1)的系统开销下进行消息持久化;高吞吐,在一台普通的服务器上既能够达到10W/s的吞吐速率;彻底的分布式系统,Broker、Producer、Consumer都原生自动支持分布式,自动实现负载均衡;支持Hadoop数据并行加载,对于像Hadoop的同样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka经过Hadoop的并行加载机制来统一了在线和离线的消息处理。Apache Kafka相对于ActiveMQ是一个很是轻量级的消息系统,除了性能很是好以外,仍是一个工做良好的分布式系统。
如上图所示,一个典型的kafka集群中包含若干producer(能够是web前端产生的page view,或者是服务器日志,系统CPU、memory等),若干broker(Kafka支持水平扩展,通常broker数量越多,集群吞吐率越高),若干consumer group,以及一个Zookeeper集群。Kafka经过Zookeeper管理集群配置,选举leader,以及在consumer group发生变化时进行rebalance。producer使用push模式将消息发布到broker,consumer使用pull模式从broker订阅并消费消息。
做为一个messaging system,Kafka遵循了传统的方式,选择由producer向broker push消息并由consumer从broker pull消息。一些logging-centric system,好比Facebook的Scribe和Cloudera的Flume,采用很是不一样的push模式。事实上,push模式和pull模式各有优劣。
push模式很难适应消费速率不一样的消费者,由于消息发送速率是由broker决定的。push模式的目标是尽量以最快速度传递消息,可是这样很容易形成consumer来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而pull模式则能够根据consumer的消费能力以适当的速率消费消息。
Topic在逻辑上能够被认为是一个queue。每条消费都必须指定它的topic,能够简单理解为必须指明把这条消息放进哪一个queue里。为了使得Kafka的吞吐率能够水平扩展,物理上把topic分红一个或多个partition,每一个partition在物理上对应一个文件夹,该文件夹下存储这个partition的全部消息和索引文件。
每一个日志文件都是“log entries”序列,每个log entry
包含一个4字节整型数(值为N),其后跟N个字节的消息体。每条消息都有一个当前partition下惟一的64字节的offset,它指明了这条消息的起始位置。磁盘上存储的消息格式以下:
message length : 4 bytes (value: 1+4+n)
“magic” value : 1 byte
crc : 4 bytes
payload : n bytes
这个“log entries”并不是由一个文件构成,而是分红多个segment,每一个segment名为该segment第一条消息的offset和“.kafka”组成。另外会有一个索引文件,它标明了每一个segment下包含的log entry
的offset范围,以下图所示。
由于每条消息都被append到该partition中,是顺序写磁盘,所以效率很是高(经验证,顺序写磁盘效率比随机写内存还要高,这是Kafka高吞吐率的一个很重要的保证)。
每一条消息被发送到broker时,会根据paritition规则选择被存储到哪个partition。若是partition规则设置的合理,全部消息能够均匀分布到不一样的partition里,这样就实现了水平扩展。(若是一个topic对应一个文件,那这个文件所在的机器I/O将会成为这个topic的性能瓶颈,而partition解决了这个问题)。在建立topic时能够在$KAFKA_HOME/config/server.properties
中指定这个partition的数量(以下所示),固然也能够在topic建立以后去修改parition数量。
# The default number of log partitions per topic. More partitions allow greater # parallelism for consumption, but this will also result in more files across # the brokers. num.partitions=3
在发送一条消息时,能够指定这条消息的key,producer根据这个key和partition机制来判断将这条消息发送到哪一个parition。paritition机制能够经过指定producer的paritition. class这一参数来指定,该class必须实现kafka.producer.Partitioner
接口。本例中若是key能够被解析为整数则将对应的整数与partition总数取余,该消息会被发送到该数对应的partition。(每一个parition都会有个序号)
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class JasonPartitioner<T> implements Partitioner {
public JasonPartitioner(VerifiableProperties verifiableProperties) {}
@Override
public int partition(Object key, int numPartitions) {
try {
int partitionNum = Integer.parseInt((String) key);
return Math.abs(Integer.parseInt((String) key) % numPartitions);
} catch (Exception e) {
return Math.abs(key.hashCode() % numPartitions);
}
}
}
若是将上例中的class做为partition.class,并经过以下代码发送20条消息(key分别为0,1,2,3)至topic2(包含4个partition)。
public void sendMessage() throws InterruptedException{
for(int i = 1; i <= 5; i++){
List messageList = new ArrayList<KeyedMessage<String, String>>();
for(int j = 0; j < 4; j++){
messageList.add(new KeyedMessage<String, String>("topic2", j+"", "The " + i + " message for key " + j));
}
producer.send(messageList);
}
producer.close();
}
则key相同的消息会被发送并存储到同一个partition里,并且key的序号正好和partition序号相同。(partition序号从0开始,本例中的key也正好从0开始)。以下图所示。
对于传统的message queue而言,通常会删除已经被消费的消息,而Kafka集群会保留全部的消息,不管其被消费与否。固然,由于磁盘限制,不可能永久保留全部数据(实际上也不必),所以Kafka提供两种策略去删除旧数据。一是基于时间,二是基于partition文件大小。例如能够经过配置$KAFKA_HOME/config/server.properties
,让Kafka删除一周前的数据,也可经过配置让Kafka在partition文件超过1GB时删除旧数据,以下所示。
############################# Log Retention Policy #############################
# The following configurations control the disposal of log segments. The policy can
# be set to delete segments after a period of time, or after a given size has accumulated.
# A segment will be deleted whenever *either* of these criteria are met. Deletion always happens
# from the end of the log.
# The minimum age of a log file to be eligible for deletion
log.retention.hours=168
# A size-based retention policy for logs. Segments are pruned from the log as long as the remaining
# segments don't drop below log.retention.bytes.
#log.retention.bytes=1073741824
# The maximum size of a log segment file. When this size is reached a new log segment will be created.
log.segment.bytes=1073741824
# The interval at which log segments are checked to see if they can be deleted according
# to the retention policies
log.retention.check.interval.ms=300000
# By default the log cleaner is disabled and the log retention policy will default to
#just delete segments after their retention expires.
# If log.cleaner.enable=true is set the cleaner will be enabled and individual logs
#can then be marked for log compaction.
log.cleaner.enable=false
这里要注意,由于Kafka读取特定消息的时间复杂度为O(1),即与文件大小无关,因此这里删除文件与Kafka性能无关,选择怎样的删除策略只与磁盘以及具体的需求有关。另外,Kafka会为每个consumer group保留一些metadata信息—当前消费的消息的position,也即offset。这个offset由consumer控制。正常状况下consumer会在消费完一条消息后线性增长这个offset。固然,consumer也可将offset设成一个较小的值,从新消费一些消息。由于offet由consumer控制,因此Kafka broker是无状态的,它不须要标记哪些消息被哪些consumer过,不须要经过broker去保证同一个consumer group只有一个consumer能消费某一条消息,所以也就不须要锁机制,这也为Kafka的高吞吐率提供了有力保障。
Kafka从0.8开始提供partition级别的replication,replication的数量可在$KAFKA_HOME/config/server.properties
中配置。
default.replication.factor = 1
该 Replication与leader election配合提供了自动的failover机制。replication对Kafka的吞吐率是有必定影响的,但极大的加强了可用性。默认状况下,Kafka的replication数量为1。 每一个partition都有一个惟一的leader,全部的读写操做都在leader上完成,leader批量从leader上pull数据。通常状况下partition的数量大于等于broker的数量,而且全部partition的leader均匀分布在broker上。follower上的日志和其leader上的彻底同样。
和大部分分布式系统同样,Kakfa处理失败须要明肯定义一个broker是否alive。对于Kafka而言,Kafka存活包含两个条件,一是它必须维护与Zookeeper的session(这个经过Zookeeper的heartbeat机制来实现)。二是follower必须可以及时将leader的writing复制过来,不能“落后太多”。
leader会track“in sync”的node list。若是一个follower宕机,或者落后太多,leader将把它从”in sync” list中移除。这里所描述的“落后太多”指follower复制的消息落后于leader后的条数超过预约值,该值可在$KAFKA_HOME/config/server.properties
中配置
#If a replica falls more than this many messages behind the leader, the leader will remove the follower from ISR and treat it as dead
replica.lag.max.messages=4000
#If a follower hasn't sent any fetch requests for this window of time, the leader will remove the follower from ISR (in-sync replicas) and treat it as dead
replica.lag.time.max.ms=10000
须要说明的是,Kafka只解决”fail/recover”,不处理“Byzantine”(“拜占庭”)问题。
一条消息只有被“in sync” list里的全部follower都从leader复制过去才会被认为已提交。这样就避免了部分数据被写进了leader,还没来得及被任何follower复制就宕机了,而形成数据丢失(consumer没法消费这些数据)。而对于producer而言,它能够选择是否等待消息commit,这能够经过request.required.acks
来设置。这种机制确保了只要“in sync” list有一个或以上的flollower,一条被commit的消息就不会丢失。
这里的复制机制即不是同步复制,也不是单纯的异步复制。事实上,同步复制要求“活着的”follower都复制完,这条消息才会被认为commit,这种复制方式极大的影响了吞吐率(高吞吐率是Kafka很是重要的一个特性)。而异步复制方式下,follower异步的从leader复制数据,数据只要被leader写入log就被认为已经commit,这种状况下若是follwer都落后于leader,而leader忽然宕机,则会丢失数据。而Kafka的这种使用“in sync” list的方式则很好的均衡了确保数据不丢失以及吞吐率。follower能够批量的从leader复制数据,这样极大的提升复制性能(批量写磁盘),极大减小了follower与leader的差距(前文有说到,只要follower落后leader不太远,则被认为在“in sync” list里)。
上文说明了Kafka是如何作replication的,另一个很重要的问题是当leader宕机了,怎样在follower中选举出新的leader。由于follower可能落后许多或者crash了,因此必须确保选择“最新”的follower做为新的leader。一个基本的原则就是,若是leader不在了,新的leader必须拥有原来的leader commit的全部消息。这就须要做一个折衷,若是leader在标明一条消息被commit前等待更多的follower确认,那在它die以后就有更多的follower能够做为新的leader,但这也会形成吞吐率的降低。
一种很是经常使用的选举leader的方式是“majority vote”(“少数服从多数”),但Kafka并未采用这种方式。这种模式下,若是咱们有2f+1个replica(包含leader和follower),那在commit以前必须保证有f+1个replica复制完消息,为了保证正确选出新的leader,fail的replica不能超过f个。由于在剩下的任意f+1个replica里,至少有一个replica包含有最新的全部消息。这种方式有个很大的优点,系统的latency只取决于最快的几台server,也就是说,若是replication factor是3,那latency就取决于最快的那个follower而非最慢那个。majority vote也有一些劣势,为了保证leader election的正常进行,它所能容忍的fail的follower个数比较少。若是要容忍1个follower挂掉,必需要有3个以上的replica,若是要容忍2个follower挂掉,必需要有5个以上的replica。也就是说,在生产环境下为了保证较高的容错程度,必需要有大量的replica,而大量的replica又会在大数据量下致使性能的急剧降低。这就是这种算法更多用在Zookeeper这种共享集群配置的系统中而不多在须要存储大量数据的系统中使用的缘由。例如HDFS的HA feature是基于majority-vote-based journal,可是它的数据存储并无使用这种expensive的方式。
实际上,leader election算法很是多,好比Zookeper的Zab, Raft和Viewstamped Replication。而Kafka所使用的leader election算法更像微软的PacificA算法。
Kafka在Zookeeper中动态维护了一个ISR(in-sync replicas) set,这个set里的全部replica都跟上了leader,只有ISR里的成员才有被选为leader的可能。在这种模式下,对于f+1个replica,一个Kafka topic能在保证不丢失已经ommit的消息的前提下容忍f个replica的失败。在大多数使用场景中,这种模式是很是有利的。事实上,为了容忍f个replica的失败,majority vote和ISR在commit前须要等待的replica数量是同样的,可是ISR须要的总的replica的个数几乎是majority vote的一半。
虽然majority vote与ISR相比有不需等待最慢的server这一优点,可是Kafka做者认为Kafka能够经过producer选择是否被commit阻塞来改善这一问题,而且节省下来的replica和磁盘使得ISR模式仍然值得。
上文提到,在ISR中至少有一个follower时,Kafka能够确保已经commit的数据不丢失,但若是某一个partition的全部replica都挂了,就没法保证数据不丢失了。这种状况下有两种可行的方案:
这就须要在可用性和一致性当中做出一个简单的平衡。若是必定要等待ISR中的replica“活”过来,那不可用的时间就可能会相对较长。并且若是ISR中的全部replica都没法“活”过来了,或者数据都丢失了,这个partition将永远不可用。选择第一个“活”过来的replica做为leader,而这个replica不是ISR中的replica,那即便它并不保证已经包含了全部已commit的消息,它也会成为leader而做为consumer的数据源(前文有说明,全部读写都由leader完成)。Kafka0.8.*使用了第二种方式。根据Kafka的文档,在之后的版本中,Kafka支持用户经过配置选择这两种方式中的一种,从而根据不一样的使用场景选择高可用性仍是强一致性。
上文说明了一个parition的replication过程,然尔Kafka集群须要管理成百上千个partition,Kafka经过round-robin的方式来平衡partition从而避免大量partition集中在了少数几个节点上。同时Kafka也须要平衡leader的分布,尽量的让全部partition的leader均匀分布在不一样broker上。另外一方面,优化leadership election的过程也是很重要的,毕竟这段时间相应的partition处于不可用状态。一种简单的实现是暂停宕机的broker上的全部partition,并为之选举leader。实际上,Kafka选举一个broker做为controller,这个controller经过watch Zookeeper检测全部的broker failure,并负责为全部受影响的parition选举leader,再将相应的leader调整命令发送至受影响的broker,过程以下图所示。
这样作的好处是,能够批量的通知leadership的变化,从而使得选举过程成本更低,尤为对大量的partition而言。若是controller失败了,幸存的全部broker都会尝试在Zookeeper中建立/controller->{this broker id},若是建立成功(只可能有一个建立成功),则该broker会成为controller,若建立不成功,则该broker会等待新controller的命令。
(本节全部描述都是基于consumer hight level API而非low level API)。
每个consumer实例都属于一个consumer group,每一条消息只会被同一个consumer group里的一个consumer实例消费。(不一样consumer group能够同时消费同一条消息)
不少传统的message queue都会在消息被消费完后将消息删除,一方面避免重复消费,另外一方面能够保证queue的长度比较少,提升效率。而如上文所将,Kafka并不删除已消费的消息,为了实现传统message queue消息只被消费一次的语义,Kafka保证保证同一个consumer group里只有一个consumer会消费一条消息。与传统message queue不一样的是,Kafka还容许不一样consumer group同时消费同一条消息,这一特性能够为消息的多元化处理提供了支持。实际上,Kafka的设计理念之一就是同时提供离线处理和实时处理。根据这一特性,可使用Storm这种实时流处理系统对消息进行实时在线处理,同时使用Hadoop这种批处理系统进行离线处理,还能够同时将数据实时备份到另外一个数据中心,只须要保证这三个操做所使用的consumer在不一样的consumer group便可。下图展现了Kafka在Linkedin的一种简化部署。
为了更清晰展现Kafka consumer group的特性,笔者做了一项测试。建立一个topic (名为topic1),建立一个属于group1的consumer实例,并建立三个属于group2的consumer实例,而后经过producer向topic1发送key分别为1,2,3r的消息。结果发现属于group1的consumer收到了全部的这三条消息,同时group2中的3个consumer分别收到了key为1,2,3的消息。以下图所示。
(本节所讲述内容均基于Kafka consumer high level API)
Kafka保证同一consumer group中只有一个consumer会消费某条消息,实际上,Kafka保证的是稳定状态下每个consumer实例只会消费某一个或多个特定partition的数据,而某个partition的数据只会被某一个特定的consumer实例所消费。这样设计的劣势是没法让同一个consumer group里的consumer均匀消费数据,优点是每一个consumer不用都跟大量的broker通讯,减小通讯开销,同时也下降了分配难度,实现也更简单。另外,由于同一个partition里的数据是有序的,这种设计能够保证每一个partition里的数据也是有序被消费。
若是某consumer group中consumer数量少于partition数量,则至少有一个consumer会消费多个partition的数据,若是consumer的数量与partition数量相同,则正好一个consumer消费一个partition的数据,而若是consumer的数量多于partition的数量时,会有部分consumer没法消费该topic下任何一条消息。
以下例所示,若是topic1有0,1,2共三个partition,当group1只有一个consumer(名为consumer1)时,该 consumer可消费这3个partition的全部数据。
增长一个consumer(consumer2)后,其中一个consumer(consumer1)可消费2个partition的数据,另一个consumer(consumer2)可消费另一个partition的数据。
再增长一个consumer(consumer3)后,每一个consumer可消费一个partition的数据。consumer1消费partition0,consumer2消费partition1,consumer3消费partition2
再增长一个consumer(consumer4)后,其中3个consumer可分别消费一个partition的数据,另一个consumer(consumer4)不能消费topic1任何数据。
此时关闭consumer1,剩下的consumer可分别消费一个partition的数据。
接着关闭consumer2,剩下的consumer3可消费2个partition,consumer4可消费1个partition。
再关闭consumer3,剩下的consumer4可同时消费topic1的3个partition。