Kafka是一种分布式的消息系统。本文基于0.9.0版本,新版kafka加入了流处理组件kafka stream,最新的官方文档又自称分布式流处理平台。前端
一个典型的kafka集群中包含若干producer(能够是web前端产生的page view,或者是服务器日志,系统CPU、memory等),若干broker(Kafka支持水平扩展,通常broker数量越多,集群吞吐率越高),若干consumer group,以及一个Zookeeper集群。Kafka经过Zookeeper管理集群配置,选举leader,以及在consumer group发生变化时进行rebalance。producer使用push模式将消息发布到broker,consumer使用pull模式从broker订阅并消费消息。 node
对于每一个Topic,Kafka会为其维护一个以下图所示的分区的日志文件
每一个partition(分区)是一个有序的、不可修改的消息组成的队列;这些消息是被不断的appended到这个commit log(提交日志文件)上的。在这些patitions之中的每一个消息都会被赋予一个叫作offset的顺序id编号,用来在partition之中惟一性的标示这个消息。git
Kafka集群会保存一个时间段内全部被发布出来的信息,不管这个消息是否已经被消费过。github
Kafka仅仅提供提供partition以内的消息的全局有序,在不一样的partition之间不能担保。partition的消息有序性加上能够按照指定的key划分消息的partition,这基本上知足了大部分应用的需求。若是你必需要实现一个全局有序的消息队列,那么能够采用Topic只划分1个partition来实现。可是这就意味着你的每一个消费组只能有惟一的一个消费者进程。web
每个consumer实例都属于一个consumer group,每一条消息都会被全部订阅了该topic的consumer group消费。经过group id指定consumer group。至关于同一个consumer group的消费者会瓜分全部的分区,每一个consumer会消费一个或多个分区。apache
使用high-level consumer时,同一个consumer group里只有一个consumer能消费到该消息。服务器
由于high level不用client关心offset, 会自动的读zookeeper中该Consumer group的last offset,至关于全部consumer都公用这个offset。当其中一个consumer消费一条消息时,offset就移动到下一条。网络
订阅模式:每一个Consumer都采用不一样的group,每一条消息都会发送给全部消费者
消息队列模式:全部的Consumer在同一个Group里,消费者之间负载均衡session
新建topic时,经过–partitions 能够设置分区数。能够指定partitions数为broker的整数倍,这样,每一个broker会对应相同个数的partitions。架构
生产者在生产数据的时候,能够为每条消息指定Key,这样消息被发送到broker时,会根据分区规则选择被存储到哪个partitions中。若是分区规则设置的合理,那么全部的消息将会被均匀的分布到不一样的partitions中,这样就实现了负载均衡和水平扩展。
Kafka保证同一consumer group中只有一个consumer会消费某条消息,实际上,Kafka保证的是稳定状态下每个consumer实例只会消费某一个或多个特定partition的数据,而某个partition的数据只会被某一个特定的consumer实例所消费。其中consumer和partition数量关系以下表所示:
consumer和partition数量关系 | 消费状况 |
---|---|
小于 | 至少有一个consumer会消费多个partition的数据 |
相等 | 一个consumer消费一个partition的数据 |
大于 | 部分consumer没法消费该topic下任何一条消息,浪费 |
增减consumer,broker,partition会致使rebalance,rebalance后consumer对应的partition会发生变化,在后面的实例中也能够看到。
利用kafka中自带的生产者和消费者例子来作个简单的测试。具体步骤在另外一篇kakfa部署中。
kafka集群有3个broker节点。新建一个partitions数量为3的topic。启动一个A终端为生产者,启动B、C、D、E终端为消费者。C、D、E终端为一个consumer group,B为单独的一个consumer group。
在producer终端输入消息从1-20。能够看到B终端会输出1-20所有消息,图中B所示。而C、D、E终端因为属于同一个Consumer Group,partitions数量等于consumer,每一个consumer消费了一个partition里的消息。图中为C、D、E。
将C终端断开,剩下B、D、E去消费消息。B终端仍是会输出1-20所有消息,图中为B1所示。而D、E属于同一个Consumer Group,且consumer数量少于partition数,能够看到D消费了两个partition中的数据,见图中D1所示。
replication策略是基于partition。kafka经过建立topic时能够经过–replication-factor配置partition副本数。配置副本以后,每一个partition都有一个惟一的leader,有0个或多个follower。
全部的读写操做都在leader上完成,leader批量从leader上pull数据。followers从leader消费消息来复制message,就跟普通的consumer消费消息同样。
通常状况下partition的数量大于等于broker的数量,而且全部partition的leader均匀分布在broker上。
broker是否alive包含两个条件:
leader会track“in sync”的node list。若是一个follower宕机,或者落后太多,leader将把它从”in sync” list中移除。
一条消息只有被“in sync” list里的全部follower都从leader复制过去才会被认为已提交。这样就避免了部分数据被写进了leader,还没来得及被任何follower复制就宕机了,而形成数据丢失(consumer没法消费这些数据)
而对于producer而言,它能够选择是否等待消息commit,这能够经过producer的“acks”来设置。默认为acks=all ,这意味着leader将等待全部follower复制完消息。
leader挂掉后,怎样在follower中选举出新的leader?
Kafka在Zookeeper中动态维护了一个ISR(in-sync replicas) set,这个set里的全部replica都跟上了leader,只有ISR里的成员才有被选为leader的可能。
若是某一个partition的全部replica都挂了,就没法保证数据不丢失了。这种状况下有两种可行的方案:
kafka采用第二种方案,能够经过配置unclean.leader.election.enable来关闭这种方案。
kafka集群有3个broker节点。具体部署在另外一篇kafka部署中。
作个简单的测试,建立一个3分区的topic,不指定副本数,能够看到默认一个副本,Partition均匀分布在各broker。
1
2
3
4
5
6
|
[fangyeqing@xxxx kafka_2.11-0.9.0.0]$bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 3 --topic test3partitions
[fangyeqing@xxxx kafka_2.11-0.9.0.0]$bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test3partitions
Topic:test3partitions PartitionCount:3 ReplicationFactor:1 Configs:
Topic: test3partitions Partition: 0 Leader: 1 Replicas: 1 Isr: 1
Topic: test3partitions Partition: 1 Leader: 2 Replicas: 2 Isr: 2
Topic: test3partitions Partition: 2 Leader: 0 Replicas: 0 Isr: 0
|
建立一个3分区2副本的topic,能够看到Replicas和Isr中有1个follower。例如Partitions:0的Leader为broker:1,follower为broker:2,而且2在Isr中,理论上当Leader挂掉以后,2会顶上。
1
2
3
4
5
6
|
[fangyeqing@xxxx kafka_2.11-0.9.0.0]$bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 3 --topic test3partition2replication --replication-factor 2
[fangyeqing@xxxx kafka_2.11-0.9.0.0]$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test3partition2replication
Topic:test3partition2replication PartitionCount:3 ReplicationFactor:2 Configs:
Topic: test3partition2replication Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2
Topic: test3partition2replication Partition: 1 Leader: 2 Replicas: 2,0 Isr: 2,0
Topic: test3partition2replication Partition: 2 Leader: 0 Replicas: 0,1 Isr: 0,1
|
建立一个3分区2副本的topic,能够看到Replicas和Isr中有2个follower
1
2
3
4
5
6
|
[fangyeqing@xxxx kafka_2.11-0.9.0.0]$bin/kafka-topics.sh --create --zookeeper localhost:2181 --partitions 3 --topic test3partition3replication --replication-factor 3
[fangyeqing@xxxx kafka_2.11-0.9.0.0]$ bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test3partition3replication
Topic:test3partition3replication PartitionCount:3 ReplicationFactor:3 Configs:
Topic: test3partition3replication Partition: 0 Leader: 1 Replicas: 1,0,2 Isr: 1,0,2
Topic: test3partition3replication Partition: 1 Leader: 2 Replicas: 2,1,0 Isr: 2,1,0
Topic: test3partition3replication Partition: 2 Leader: 0 Replicas: 0,2,1 Isr: 0,2,1
|
目前有下面几种消息确保机制:
Kafka默认保证At least once,而且容许经过设置producer异步提交来实现At most once。下面分阶段分析:
当Producer向broker发送消息,由上述Replication的分析可知,一旦这条消息已经被commit,若是这个topic有多个replication(副本),某个broker挂掉也不会丢失消息。
Producer发送数据给broker的过程当中,若是遇到网络问题而形成通讯中断:
当Consumer从broker消费消息时,consumer若是在消费消息时crash: