一、什么是Kafka?linux
Kafka是一个使用Scala编写的消息系统,本来开发自LinkedIn,用做LinkedIn的活动流(Activity Stream)和运营数据处理管道(Pipeline)的基础。如今它已被多家不一样类型的公司做为多种类型的数据管道和消息系统使用。算法
Kafka是一种分布式的,基于发布/订阅的消息系统。服务器
Kafka使用zookeeper做为其分布式协调框架,很好的将消息生产、消息存储、消息消费的过程结合在一块儿。同时借助zookeeper,kafka可以生产者、消费者和broker在内的因此组件在无状态的状况下,创建起生产者和消费者的订阅关系,并实现生产者与消费者的负载均衡。网络
二、kafka的特性架构
(1)以时间复杂度为O(1)的方式提供消息持久化能力,即便对TB级以上数据也能保证常数时间复杂度的访问性能。并发
(2)高吞吐率。即便在很是廉价的商用机器上也能作到单机支持每秒100K条以上消息的传输。负载均衡
(3)支持Kafka Server间的消息分区,及分布式消费,同时保证每一个Partition内的消息顺序存储和传输。框架
(4)同时支持离线数据处理(Offline)和实时数据处理(Online)。dom
(5)Scale out:支持在线水平扩展。无需停机便可扩展机器。异步
(6)支持按期删除数据机制。能够按照时间段来删除,也能够按照文档大小来删除。
(7)Consumer采用pull的方式消费数据,消费状态由Consumer控制,减轻Broker负担。
三、Kafka架构
(1)Broker:和RabbitMQ中的Broker概念相似。一个kafka服务器就是一个Broker,而一个kafka集群包含一个或多个Broker。Broker会持久化数据到相应的Partition中,不会有cache压力。
(2)Topic:主题。每条消息都有一个类别,这个类别就叫作Topic。Kafka的Topic能够理解为RabbitMQ的Queue消息队列,相同类别的消息被发送到同一个Topic中,而后再被此Topic的Consumer消费。Topic是逻辑上的概念,而物理上的实现就是Partition。
(3)Partition:分区。分区是物理上的概念,每一个Topic包含一个或者多个Partition,每一个Partition都是一个有序队列。发送给Topic的消息通过分区算法(能够自定义),决定消息存储在哪个Partition当中。每一条数据都会被分配一个有序id:Offset。注意:kafka只保证按一个partition中的顺序将消息发给Consumer,不保证一个Topic的总体(多个partition间)的顺序。
(4)Replication:备份。Replication是基于Partition而不是Topic的。每一个Partition都有本身的备份,且分布在不一样的Broker上。
(5)Offset:偏移量。kafka的存储文件都是用offset来命名,用offset作名字的好处是方便查找。例如你想找位于2049的位置,只要找到2048.log的文件便可。固然the first offset就是00000000000.log。注意:每一个Partition中的Offset都是各不影响的从0开始的有序数列。
(6)Producer:消息生产者。
(7)Consumer:消息消费者。Consumer采用pull的方式从Broker获取消息,而且Consumer要维护消费状态,所以Kafaka系统中,业务重心通常都在Consumer身上,而不像RabbitMQ那样Broker作了大部分的事情。
(8)Consumer Group:消费组。每一个Consumer属于一个特定的Consumer Group(可为每一个Consumer指定group name,若不指定group name则属于默认的group)。每一个Topic能够被多个Group订阅,每一个Group中能够有多个Consumer。发送到Topic的一条消息会被每一个Group中的一个Consumer消费,多个Consumer之间将交错消费整个Topic的消息,实现负载均衡。
(9)Record:消息。每个消息由一个Key、一个Value和一个时间戳构成。
Kafka内部结构图(图片源于网络)
Kafka拓扑结构图(图片源于网络)
Topic在逻辑上能够被认为是一个queue,每条消费都必须指定它的Topic,能够简单理解为必须指明把这条消息放进哪一个queue里。为了使得Kafka的吞吐率能够线性提升,物理上把Topic分红一个或多个Partition,每一个Partition在物理上对应一个文件夹,该文件夹下存储这个Partition的全部消息和索引文件。若建立topic1和topic2两个topic,且分别有13个和19个分区,则整个集群上会相应会生成共32个文件夹。partiton命名规则为topic名称+有序序号,第一个partiton序号从0开始,序号最大值为partitions数量减1。
(1)每一个partition目录至关于一个巨型文件被平均分配到多个大小相等segment数据文件中。但每一个segment file消息数量不必定相等,这种特性方便old segment file快速被删除。
(2)每一个partiton只须要支持顺序读写就好了,segment文件生命周期由服务端配置参数决定。
(3)segment file组成:由2大部分组成,分别为index file(后缀“.index”)和data file(后缀“.log”),此2个文件一一对应,成对出现。
(4)segment文件命名规则:partition全局的第一个segment从0开始,后续每一个segment文件名为上一个segment文件最后一条消息的offset值。数值最大为64位long大小,19位数字字符长度,没有数字用0填充。
Segment file结构图(图片来源于网络)
以上述图2中一对segment file文件为例,说明segment中index和log文件对应关系物理结构以下:
Index和log文件对应图(图片来源于网络)
其中以索引文件中元数据3,497为例,依次在数据文件中表示第3个message(在全局partition表示第368772个message)、以及该消息的物理偏移地址为497。
例如读取offset=368776的message,须要经过下面2个步骤查找。
(1)第一步查找segment file
上图为例,其中00000000000000000000.index表示最开始的文件,起始偏移量(offset)为0.第二个文件00000000000000368769.index的消息量起始消息为368770 = 368769 + 1.一样,第三个文件00000000000000737337.index的起始消息为737338=737337 + 1,只要根据offset 进行二分查找文件列表,就能够快速定位到具体文件。当offset=368776时定位到00000000000000368769.index|log。
(2)第二步经过segment file查找message
经过第一步定位到segment file,当offset=368776时,依次定位到00000000000000368769.index的元数据物理位置和00000000000000368769.log的物理偏移地址,而后再经过00000000000000368769.log顺序查找直到offset=368776为止。
首先来看一条在Linux下建立topic的命令:
bin/kafka-topics.sh --create --zookeeper ip1:2181,ip2:2181,ip3:2181,ip4:2181 --replication-factor 2 --partitions 4 --topic test
此命令的意思是在四个Broker的kafka集群上建立一个名为test的Topic,而且有4个分区2个备份(此处比较容易搞混,2个Replication表示Leader和Follower一共加起来有2个)。此时在四台机器上面就有8个Partition,如图所示。
Kafka集群Partition分布图1(图片来源于网络)
当集群中新增2节点,Partition增长到6个时分布状况以下:
Kafka集群Partition分布图2(图片来源于网络)
在Kafka集群中,每一个Broker都有均等分配Leader Partition机会。
上述图Broker Partition中,箭头指向为副本,以Partition-0为例:broker1中parition-0为Leader,Broker2中Partition-0为副本。每一个Broker(按照BrokerId有序)依次分配主Partition,下一个Broker为副本,如此循环迭代分配,多副本都遵循此规则。
副本分配算法:
(1)将全部n个Broker和待分配的i个Partition排序。
(2)将第i个Partition分配到第(i mod n)个Broker上。
(3)将第i个Partition的第j个副本分配到第((i + j) mod n)个Broker上
例如图2中的第三个Partition:partition-2,将被分配到Broker3((3 mod 6)=3)上,partition-2的副本将被分配到Broker4上((3+1) mod 6=4)。
(1)Kafka把topic中一个parition大文件分红多个小文件段,经过多个小文件段,就容易按期清除或删除已经消费完文件,减小磁盘占用。能够设置segment文件大小按期删除和消息过时时间按期删除
(2)经过索引信息能够快速定位message。
(3)经过index元数据所有映射到memory,能够避免segment file的IO磁盘操做。
(4)经过索引文件稀疏存储,能够大幅下降index文件元数据占用空间大小。
对于多个Partition,多个Consumer
(1)若是consumer比partition多,是浪费,由于kafka的设计是在一个partition上是不容许并发的,因此consumer数不要大于partition数。
(2)若是consumer比partition少,一个consumer会对应于多个partition,这里要合理分配consumer数和partition数,不然会致使partition里面的数据被取的不均匀。最好partiton数目是consumer数目的整数倍,因此partition数目很重要,好比取24,就很容易设定consumer数目。
(3)若是consumer从多个partition读到数据,不保证数据间的顺序性,kafka只保证在一个partition上数据是有序的,但多个partition,根据你读的顺序会有不一样
(4)增减consumer,broker,partition会致使rebalance,因此rebalance后consumer对应的partition会发生变化
(5)High-level接口中获取不到数据的时候是会block的。
关于zookeeper中Offset初始值的问题:
Zookeeper中Offset的初始值默认是非法的,所以经过设置Consumer的参数auto.offset.reset来告诉Consumer读取到Offset非法时该怎么作。
auto.offset.reset有三个值:
(1)smallest : 自动把zookeeper中的offset设为Partition中最小的offset;
(2)largest : 自动把zookeeper中offset设为Partition中最大的offset;
(3)anything else: 抛出异常;
auto.offset.reset默认值是largest,此种状况下若是producer先发送了10条数据到某个Partition,而后Consumer启功后修改zookeeper中非法Offset值为Partition中的最大值9(Offset从0开始),这样Consumer就忽略了这10条消息。就算如今再次设置成smallest也读取不到以前的10条数据了,由于此时Offset是合法的了。
因此,想要读取以前的数据,就须要在一开始指定auto.offset.reset=smallest。
Replication是基于Partition而不是Topic的。每一个Partition都有本身的备份,且分布在不一样的Broker上。这些Partition当中有一个是Leader,其余都是Follower。Leader Partition负责读写操做,Follower Partition只负责从Leader处复制数据,使本身与Leader保持一致。Zookeeper负责二者间的故障切换(fail over,能够理解为Leader选举)。
消息复制延迟受最慢的Follower限制,Leader负责跟踪全部Follower的状态,若是Follower“落后”太多或者失效,Leader就将此Follower从Replication同步列表中移除,但此时Follower是活着的,而且一直从Leader拉取数据,直到差距小于replica.lag.max.messages值,而后从新加入同步列表。当一条消息被全部的Follower保存成功,此消息才被认为是“committed”,Consumer才能消费这条消息。这种同步方式就要求Leader和Follower之间要有良好的网络环境。
一个partition的follower落后于leader足够多时,会被认为不在同步副本列表或处于滞后状态。在Kafka-0.8.2.x中,副本滞后判断依据是副本落后于leader最大消息数量(replica.lag.max.messages)或replication响应Leader partition的最长等待时间(replica.lag.time.max.ms)。前者是用来检测缓慢的副本,然后者是用来检测失效或死亡的副本。假设replica.lag.max.messages设置为4,代表只要follower落后leader的消息数小于4,就不会从同步副本列表中移除。replica.lag.time.max设置为500 ms,代表只要follower向leader发送拉取数据请求时间间隔超过500 ms,就会被标记为死亡,而且会从同步副本列表中移除。
当Leader处于流量高峰时,好比一瞬间就收到了4条数据,此时全部Follower将被认为是“out-of-sync”而且从同步副本列表中移除,而后Follower拉取数据遇上Leader事后又从新加入同步列表,就这样Follower频繁在副本同步列表移除和从新加入之间来回切换。
即便只有一个replicas实例存活,仍然能够保证消息的正常发送和接收,只要zookeeper集群存活便可(注意:不一样于其余分布式存储,好比hbase须要"多数派"存活才行)。
当leader失效时,需在followers中选取出新的leader,可能此时follower落后于leader,所以须要选择一个"up-to-date"的follower。kafka中leader选举并无采用"投票多数派"的算法,由于这种算法对于"网络稳定性"/"投票参与者数量"等条件有较高的要求,并且kafka集群的设计,还须要容忍N-1个replicas失效。对于kafka而言,每一个partition中全部的replicas信息均可以在zookeeper中得到,那么选举leader将是一件很是简单的事情。选择follower时须要兼顾一个问题,就是新leader 所在的server服务器上已经承载的partition leader的个数,若是一个server上有过多的partition leader,意味着此server将承受着更多的IO压力。在选举新leader,须要考虑到"负载均衡",partition leader较少的broker将会更有可能成为新的leader。在整个集群中,只要有一个replicas存活,那么此partition均可以继续接受读写操做。
当一个Group中,有Consumer加入或者离开时,会触发Partitions均衡。均衡的最终目的,是提高Topic的并发消费能力。
(1)假如topic1,具备以下partitions: P0,P1,P2,P3
(2)加入group中,有以下consumer: C0,C1
(3)首先根据partition索引号对partitions排序: P0,P1,P2,P3
(4)根据consumer.id排序: C0,C1
(5)计算倍数: M = [P0,P1,P2,P3].size / [C0,C1].size,本例值M=2(向上取整)
(6)而后依次分配partitions: C0 = [P0,P1],C1=[P2,P3],即Ci = [P(i * M),P((i + 1) * M -1)]
经过此算法,就能知道具体Consumer消费的是哪一个分区中的数据。
在kafka-Client-0.11.0.0.jar中,提供的有默认的KafkaProducer和DefaultPartitioner实现。其中DefaultPartitioner主要提供了Producer发送消息到分区的路由算法,若是给定Key值,就经过Key的哈希值和分区个数取余来计算;若是没有给定Key,就经过ThreadLocalRandom.current().nextInt()产生的随机数与分区数取余(其中涉及复杂步奏参考以下代码)。具体代码以下:
public class DefaultPartitioner implements Partitioner { private final ConcurrentMap<string, atomicinteger=""> topicCounterMap = new ConcurrentHashMap<>(); public void configure(Map<string,> configs) {} /** * 计算给定记录的分区 * @param topic The topic name * @param key The key to partition on (or null if no key) * @param keyBytes serialized key to partition on (or null if no key) * @param value The value to partition on or null * @param valueBytes serialized value to partition on or null * @param cluster The current cluster metadata */ public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) { List<partitioninfo> partitions = cluster.partitionsForTopic(topic); int numPartitions = partitions.size(); if (keyBytes == null) { int nextValue = nextValue(topic); List<partitioninfo> availablePartitions = cluster.availablePartitionsForTopic(topic); if (availablePartitions.size() > 0) { int part = Utils.toPositive(nextValue) % availablePartitions.size(); return availablePartitions.get(part).partition(); } else { // no partitions are available, give a non-available partition return Utils.toPositive(nextValue) % numPartitions; } } else { // hash the keyBytes to choose a partition return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions; } } private int nextValue(String topic) { AtomicInteger counter = topicCounterMap.get(topic); if (null == counter) { counter = new AtomicInteger(ThreadLocalRandom.current().nextInt()); AtomicInteger currentCounter = topicCounterMap.putIfAbsent(topic, counter); if (currentCounter != null) { counter = currentCounter; } } return counter.getAndIncrement(); } public void close() {} }
咱们也能够设置本身的Partition路由规则,须要继承Partitioner类实现
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);
Kafka的消息delivery保证主要有三种:
(1)At most once 最多一次。消息可能会丢失,但毫不会重复传输。
(2)At least once 最少一次。消息毫不会丢失,但可能会重复传输。
(3)Exactly once 正好一次。每条消息正好被传输一次和消费一次。
Producer的delivery保证能够经过参数request.required.acks设置来保证:
(1)request.required.acks=0。
至关于消息异步发送。消息一发送完毕立刻发送下一条。因为不须要ack,可能会形成数据丢失,至关于实现了At most once。
(2)request.required.acks=1。
消息发送给Leader Partition,在Leader Partition确认消息并ack 生产者事后才发下一条。
(3)request.required.acks=-1。
消息发送给Leader,在Leader收到全部Follower确认保存消息的ack后对producer进行ack才发送下一条。
因此一条消息从Producer到Broker至少是确保了At least once的,由于有Replication的存在,只要消息到达Broker就不会丢失。若是ack出现问题,好比网络中断,有可能会致使producer收不到ack而重复发送消息。Exactly once这种方式,没有查到相关的实现。
第(3)种方式的具体步奏以下:
a. producer 先从 zookeeper 的 "/brokers/.../state" 节点找到该 partition 的 leader
b. producer 将消息发送给该 leader
c. leader 将消息写入本地 log
d. followers 从 leader pull 消息,写入本地 log 后向 leader 发送 ACK
e. leader 收到全部 ISR 中的 replica 的 ACK 后,增长 HW(high watermark,最后 commit 的 offset) 并向 producer 发送 ACK
Consumer从Broker拉取数据事后,能够选择commit,此操做会在zookeeper中存下此Consumer读取对应Partition消息的Offset,以便下一次拉取数据时会从Partition的下一个Offset消费,避免重复消费。
一样,Consumer能够经过设置参数enable.auto.commit=true来自动确认消息,即Consumer一收到消息马上自动commit。若是只看消息的读取过程,kafka是确保了Exactly once的,可是实际状况中Consumer不可能读取到数据就结束了,每每还须要处理读取到的数据。所以Consumer处理消息和commit消息的顺序就决定了delivery保证的类别。
(1)先处理后commit
这种方式实现了At least once。Consumer收到消息先处理后提交,若是在处理完成后机器崩溃了,致使Offset没有更新,Consumer下次启动时又会从新读取上一次消费的数据,实际上此消息已经处理过了。
(2)先commit后处理
这种方式实现了At most once。Consumer收到消息事后马上commit,更新zookeeper上的Offset,而后再处理消息。若是处理还未结束Consumer崩溃了,等Consumer再次启动的时候会读取Offset更新事后的下一条数据,这就致使了数据丢失。
Kafka提供了两种Consumer API,选用哪一种API须要视具体状况而定。
High Level Consumer API围绕着Consumer Group这个逻辑概念展开,它屏蔽了每一个Topic的每一个Partition的Offset管理(自动读取zookeeper中该Partition的last offset )、Broker失败转移以及增减Partition、Consumer时的负载均衡(当Partition和Consumer增减时,Kafka自动进行Rebalance)。
Low Level Consumer API,做为底层的Consumer API,提供了消费Kafka Message更大的控制,用户能够实现重复读取、跳读等功能。
使用Low Level Consumer API,是没有对Broker、Consumer、Partition增减进行处理,若是出现这些的增减时,须要本身处理负载均衡。
Low Level Consumer API提供更大灵活控制是以增长复杂性为代价的:
(1)Offset再也不透明
(2)Broker自动失败转移须要处理
(3)增长Consumer、Partition、Broker须要本身作负载均衡