kafka原理解析

介绍

分布式消息系统kafka的提供了一个生产者、缓冲区、消费者的模型html

  • broker:中间的kafka cluster,存储消息,是由多个server组成的集群
  • topic:kafka给消息提供的分类方式。broker用来存储不一样topic的消息数据
  • producer:往broker中某个topic里面生产数据
  • consumer:往broker中某个topic获取数据

设计思想

topic与消息

kafka将全部消息组织成多个topic的形式存储,而每一个topic又能够拆分红多个partition,每一个partition又由一个一个消息组成。每一个消息都被标识了一个递增序列号表明其进来的前后顺序,并按顺序存储在partition中。java

这样,消息就以一个个id的方式,组织起来。node

  • producer选择一个topic,生产消息,消息会经过分配策略append到某个partition末尾
  • consumer选择一个topic,经过id指定从哪一个位置开始消费消息。消费完成以后保留id,下次能够从这个位置开始继续消费,也能够从其余任意位置开始消费

这个id,在kafka中被称为offsetreact

这种组织和处理策略提供了以下好处:git

  • 消费者能够根据需求,灵活指定offset消费
  • 保证了消息不变性,为并发消费提供了线程安全的保证。每一个consumer都保留本身的offset,互相之间不干扰,不存在线程安全问题
  • 消息访问的并行高效性。每一个topic中的消息被组织成多个partition,partition均匀分配到集群server中。生产、消费消息的时候,会被路由到指定partition,减小竞争,增长了程序的并行能力
  • 增长消息系统的可伸缩性。每一个topic中保留的消息可能很是庞大,经过partition将消息切分红多个子消息,并经过负责均衡策略将partition分配到不一样server。这样当机器负载满的时候,经过扩容能够将消息从新均匀分配
  • 保证消息可靠性。消息消费完成以后不会删除,能够经过重置offset从新消费,保证了消息不会丢失
  • 灵活的持久化策略。能够经过指定时间段(如最近一天)来保存消息,节省broker存储空间

备份

消息以partition为单位分配到多个server,并以partition为单位进行备份。备份策略为:1个leader和N个followers,leader接受读写请求,followers被动复制leader。leader和followers会在集群中打散,保证partition高可用github

producer

producer生产消息须要以下参数:算法

  • topic:往哪一个topic生产消息
  • partition:往哪一个partition生产消息
  • key:根据该key将消息分区到不一样partition
  • message:消息

根据kafka源码,能够根据不一样参数灵活调整生产、分区策略apache

if topic is None
    throw Error

p=None

if partition Not None
    if partition < 0 Or partition >= numPartitions
        throw Error
    p=partition
elif key Not None
    p=hash(key) % numPartitions
else
    p=round-robin() % numPartitions

send message to the partition p

上面是我翻译的伪代码,其中round-robin就是简单轮询,hash采用的是murmurhashapi

consumer

传统消息系统有两种模式:缓存

  • 队列
  • 发布订阅

kafka经过consumer group将两种模式统一处理

每一个consumer将本身标记consumer group名称,以后系统会将consumer group按名称分组,将消息复制并分发给全部分组,每一个分组只有一个consumer能消费这条消息。

因而推理出两个极端状况:

  • 当全部consumer的consumer group相同时,系统变成队列模式
  • 当每一个consumer的consumer group都不相同时,系统变成发布订阅

多consumer并发消费消息时,容易致使消息乱序

经过限制消费者为同步,能够保证消息有序,可是这大大下降了程序的并发性。

kafka经过partition的概念,保证了partition内消息有序性,缓解了上面的问题。partition内消息会复制分发给全部分组,每一个分组只有一个consumer能消费这条消息。这个语义保证了某个分组消费某个分区的消息,是同步而非并发的。若是一个topic只有一个partition,那么这个topic并发消费有序,不然只是单个partition有序。

通常消息消息系统,consumer存在两种消费模型:

  • push:优点在于消息实时性高。劣势在于没有考虑consumer消费能力和饱和状况,容易致使producer压垮consumer
  • pull:优点在能够控制消费速度和消费数量,保证consumer不会出现饱和。劣势在于当没有数据,会出现空轮询,消耗cpu

kafka采用pull,并采用可配置化参数保证当存在数据而且数据量达到必定量的时候,consumer端才进行pull操做,不然一直处于block状态

kakfa采用整数值consumer position来记录单个分区的消费状态,而且单个分区单个消息只能被consumer group内的一个consumer消费,维护简单开销小。消费完成,broker收到确认,position指向下次消费的offset。因为消息不会删除,在完成消费,position更新以后,consumer依然能够重置offset从新消费历史消息

消息发送语义

producer视角

  • 消息最多发送一次:producer异步发送消息,或者同步发消息但重试次数为0
  • 消息至少发送一次:producer同步发送消息,失败、超时都会重试
  • 消息发且仅发一次:后续版本支持

consumer视角

  • 消息最多消费一次:consumer先读取消息,再确认position,最后处理消息
  • 消息至少消费一次:consumer先读取消息,再处理消息,最后确认position
  • 消息消费且仅消费一次:
    1. 若是消息处理后的输出端(如db)能保证消息更新幂等性,则屡次消费也能保证exactly once语义
    2. 若是输出端能支持两阶段提交协议,则能保证确认position和处理输出消息同时成功或者同时失败
    3. 在消息处理的输出端存储更新后的position,保证了确认position和处理输出消息的原子性(简单、通用)

可用性

在kafka中,正常状况下全部node处于同步中状态,当某个node处于非同步中状态,也就意味着整个系统出问题,须要作容错处理

同步中表明了:

  • 该node与zookeeper能连通
  • 该node若是是follower,那么consumer position与leader不能差距太大(差额可配置)

某个分区内同步中的node组成一个集合,即该分区的ISR

kafka经过两个手段容错:

  • 数据备份:以partition为单位备份,副本数可设置。当副本数为N时,表明1个leader,N-1个followers,followers能够视为leader的consumer,拉取leader的消息,append到本身的系统中
  • failover:
    1. 当leader处于非同步中时,系统从followers中选举新leader
    2. 当某个follower状态变为非同步中时,leader会将此follower剔除ISR,当此follower恢复并完成数据同步以后再次进入ISR

另外,kafka有个保障:当producer生产消息时,只有当消息被全部ISR确认时,才表示该消息提交成功。只有提交成功的消息,才能被consumer消费

综上所述:当有N个副本时,N个副本都在ISR中,N-1个副本都出现异常时,系统依然能提供服务

假设N副本全挂了,node恢复后会面临同步数据的过程,这期间ISR中没有node,会致使该分区服务不可用。kafka采用一种降级措施来处理:选举第一个恢复的node做为leader提供服务,以它的数据为基准,这个措施被称为脏leader选举

因为leader是主要提供服务的,kafka broker将多个partition的leader均分在不一样的server上以均摊风险

每一个parition都有leader,若是在每一个partition内运行选主进程,那么会致使产生很是多选主进程。kakfa采用一种轻量级的方式:从broker集群中选出一个做为controller,这个controller监控挂掉的broker,为上面的分区批量选主

一致性

上面的方案保证了数据高可用,有时高可用是体如今对一致性的牺牲上。若是但愿达到强一致性,能够采起以下措施:

  • 禁用脏leader选举,ISR没有node时,宁肯不提供服务也不要未彻底同步的node
  • 设置最小ISR数量min_isr,保证消息至少要被min_isr个node确认才能提交

持久化

基于如下几点事实,kafka重度依赖磁盘而非内存来存储消息

  • 硬盘便宜,内存贵
  • 顺序读+预读取操做,能提升缓存命中率
  • 操做系统利用富余的内存做为pagecache,配合预读取(read-ahead)+写回(write-back)技术,从cache读数据,写到cache就返回(操做系统后台flush),提升用户进程响应速度
  • java对象实际大小比理想大小要大,使得将消息存到内存成本很高
  • 当堆内存占用不断增长时,gc抖动较大
  • 基于文件顺序读写的设计思路,代码编写简单

在持久化数据结构的选择上,kafka采用了queue而不是Btree

  • kafka只有简单的根据offset读和append操做,因此基于queue操做的时间复杂度为O(1),而基于Btree操做的时间复杂度为O(logN)
  • 在大量文件读写的时候,基于queue的read和append只须要一次磁盘寻址,而Btree则会涉及屡次。磁盘寻址过程极大下降了读写性能

性能

kafka在如下四点作了优化:

  • 将大量小io改形成少许大io
  • 利用sendfile减小数据拷贝
  • 支持snappy,gzip,lz4三种算法批量压缩消息,减小网络传输消耗
  • 采用nio网络模型,与1 acceptor thread + N processor threads的reactor线程模型

大量读写少许消息会致使性能较差,经过将消息聚合,能够减小读写次数(减小随机IO),增长单次读写数据量(增长顺序IO)

普通状况下,数据从磁盘传输到网络须要经历如下步骤:

  1. 磁盘->内核page cache
  2. 内核page cache->用户buffer
  3. 用户buffer->socket buffer
  4. socket buffer->NIC buffer(NIC:网卡接口)

利用sendfile系统调用,能够简化至:

  1. 磁盘->内核page cache
  2. 内核page cache->NIC buffer

减小了两次拷贝步骤。在存在大量数据传输的操做时,会显著提高性能

在大量文件读写的时候,基于queue的read和append只须要一次磁盘寻址,而Btree则会涉及屡次。磁盘寻址过程极大下降了读写性能

kafka server端采用与Mina同样的网络、线程模型。server端基于nio,采用1个acceptor线程接受tcp链接,并将链接分配给N个proccessor线程,proccessor线程执行具体的IO读写、逻辑处理操做。(注:相比较于这种模型,netty的N boss + N worker的模型更加灵活)

外部依赖

zookeeper

broker node在zookeeper中采用惟一id(整数)标识


/brokers/ids/[N] --> host:port 瞬时节点

  • [N] 表明分区数

此znode存储了broker node的ip端口


/brokers/topics/[topic]/partitions/[N]/state --> leader,isr 瞬时节点

  • [topic] 表明某个topic名称
  • [N] 表明分区数

此znode存储了该分区的leader id和isr列表(由id组成)


/consumers/[group_id]/ids/[customer_id] --> {"topic1": #streams, ..., "topicN": #streams} 瞬时节点

  • [group_id] 消费者所属groupid
  • [customer_id] 消费者id,结构为 host+uuid
  • topicN 订阅topic name
  • #streams 消费线程数量

此znode存储了指定consumer消费topic所使用的线程数


/consumers/[group_id]/offsets/[topic]/[N] --> offset 永久节点

  • [group_id] 消费者所属groupid
  • [topic] 订阅topic name
  • [N] 分区数

consumer能够经过三种方式管理offset:

  • 手动管理。使用低层次consumer api,灵活,较麻烦
  • 交给zookeeper管理。使用高层次consumer api,设置offsets.storage=zookeeper,方便,性能稍差。0.8.2默认配置
  • 交给kafka管理。使用高层次consumer api,设置offsets.storage=kafka,方便,原生态性能优。实现原理是kafka选出一个broker做为offset manager,建立一个名为__consumer_offsets的topic,将offset存储在该topic下,推荐采用

此znode存储了指定consumer在topic中最新consumer offset


/consumers/[group_id]/owners/[topic]/[N] --> consumer_id 瞬时节点

指定分区在某一时刻只能被全部consumer group中的某一个consumer消费,经过将consumer_id存在指定分区下,就能保证这时该分区只能被这个consumer消费


上面只是列出的最典型的znode,经过研究znode,能够开发出一个kafka monitor,用来监控kafka数据消费情况,好比KafkaOffsetMonitor

参考资料

相关文章
相关标签/搜索