消息队列二三事

最近在看kafka的代码,就免不了想看看消息队列的一些要点:服务质量(QOS)性能扩展性等等,下面一一探索这些概念,并谈谈在特定的消息队列如kafka或者mosquito中是如何具体实现这些概念的。html

服务质量

服务语义

服务质量通常能够分为三个级别,下面说明它们不一样语义。java

At most once

至多一次,消息可能丢失,但毫不会重复传输。
生产者:彻底依赖底层TCP/IP的传输可靠性,不作特殊处理,所谓“发送即忘”。kafka中设置acks=0
消费者:先保存消费进度,再处理消息。kafka中设置消费者自动提交偏移量并设置较短的提交时间间隔。linux

At least once

至少一次,消息毫不会丢,可是可能会重复。
生产者:要作消息防丢失的保证。kafka中设置acks=1 或 all并设置retries>0
消费者:先处理消息,再保存消费进度。kafka中设置消费者自动提交偏移量并设置很长的提交时间间隔,或者直接关闭自动提交偏移量,处理消息后手动调用同步模式的偏移量提交。算法

Exactly once

精确一次,每条消息确定会被传输一次且仅一次。
这个级别光靠消息队列自己并很差保证,有可能要依赖外部组件。
生产者:要作消息防丢失的保证。kafka中设置acks=1 或 all并设置retries>0。mosquito中经过四步握手与DUP、MessageID等标识来实现单次语义。
消费者:要作消息防重复的保证,有多种方案,如:在保存消费进度和处理消息这两个操做中引入两阶段提交协议;让消息幂等;让消费处理与进度保存处于一个事务中来保证原子性。kafka中关闭自动提交偏移量,并设置自定义的再平衡监听器,监听到分区发生变化时从外部组件读取或者存储偏移量,保证本身或者其余消费者在更换分区时能读到最新的偏移量从而避免重复。总之就是结合ConsumerRebalanceListenerseek和一个外部系统(如支持事务的数据库)共同来实现单次语义。此外,kafka还提供了GUID以便用户自行实现去重。kafka 0.11版本经过3个大的改动支持EOS:1.幂等的producer;2. 支持事务;3. 支持EOS的流式处理(保证读-处理-写全链路的EOS)。
这三个级别可靠性依次增长,可是延迟带宽占用也会增长,因此实际状况中,要依据业务类型作出权衡。数据库

可靠性

上面的三个语义不只须要生产者和消费者的配合实现,还要broker自己的可靠性来进行保证。可靠性就是只要broker向producer发出确认,就必定要保证这个消息能够被consumer获取。apache

kafka 中一个topic有多个partition,每一个partition又有多个replica,全部replica中有一个leaderISR是必定要同步leader后才能返回提交成功的replica集OSR内的replica尽力的去同步leader,可能数据版本会落后。在kafka工做的过程当中,若是某个replica同步速度慢于replica.lag.time.max.ms指定的阈值,则被踢出ISR存入OSR,若是后续速度恢复能够回到ISR中。能够配置min.insync.replicas指定ISR中的replica最小数量,默认该值为1。LEO是分区的最新数据的offset,当数据写入leader后,LEO就当即执行该最新数据,至关于最新数据标识位。HW是当写入的数据被同步到全部的ISR中的副本后,数据才认为已提交,HW更新到该位置,HW以前的数据才能够被消费者访问,保证没有同步完成的数据不会被消费者访问到,至关于全部副本同步数据标识位。segmentfault

每一个partition的全部replica须要进行leader选举(依赖ZooKeeper)。在leader宕机后,只能从ISR列表中选取新的leader,不管ISR中哪一个副本被选为新的leader,它都知道HW以前的数据,能够保证在切换了leader后,消费者能够继续看到HW以前已经提交的数据。当ISR中全部replica都宕机该partition就不可用了,能够设置unclean.leader.election.enable=true,该选项使得kafka选择任何一个活的replica成为leader而后继续工做,此replica可能不在ISR中,就可能致使数据丢失。因此实际使用中须要进行可用性可靠性的权衡。设计模式

kafka建议数据可靠存储不依赖于数据强制刷盘(会影响总体性能),而是依赖于replica网络

顺序消费

顺序消费是指消费者处理消息的顺序与生产者投放消息的顺序一致。
主要可能破坏顺序的场景是生产者投放两条消息AB,而后A失败重投递致使消费者拿到的消息是BA。多线程

kafka中能保证分区内部消息的有序性,其作法是设置max.in.flight.requests.per.connection=1,也就是说生产者在未获得broker对消息A的确认状况下是不会发送消息B的,这样就能保证broker存储的消息有序,天然消费者请求到的消息也是有序的。
可是咱们明显能感受到这会下降吞吐量,由于消息不能并行投递了,并且会阻塞等待,也无法发挥 batch 的威力。
若是想要整个topic有序,那就只能一个topic一个partition了,一个consumer group也就只有一个consumer了。这样就违背了kafka高吞吐的初衷。

重复消费

重复消费是指一个消息被消费者重复消费了。 这个问题也是上面第三个语义须要解决的。

通常的消息系统如kafka或者相似的rocketmq都不能也不提倡在系统内部解决,而是配合第三方组件,让用户本身去解决。究其缘由仍是解决问题的成本解决问题后得到的价值不匹配,因此干脆不解决,就像操做系统对待死锁同样,采起“鸵鸟政策”。
可是kafka 0.11仍是处理了这个问题,见发行说明,维护者是想让用户无可挑剔嘛 [笑cry]。

性能

衡量一个消息系统的性能有许多方面,最多见的就是下面几个指标。

链接数

是指系统在同一时刻能支持多少个生产者或者消费者的链接总数。链接数和broker采用的网络IO模型直接相关,常见模型有:单线程链接每线程ReactorProactor等。
单线程一时刻只能处理一个链接,链接每线程受制于server的线程数量,Reactor是目前主流的高性能网络IO模型,Proactor因为操做系统对真异步的支持不太行因此还没有流行。

kafka的broker采用了相似于NettyReactor模型:1(1个Acceptor线程)+N(N个Processor线程)+M(M个Work线程)。
其中Acceptor负责监听新的链接请求,同时注册OPACCEPT事件,将新的链接按照RoundRobin的方式交给某个Processor线程处理。
每一个Processor都有一个NIO selector,向 Acceptor分配的 SocketChannel 注册 OPREAD、OPWRITE事件,对socket进行读写。N由num.networker.threads决定。
Worker负责具体的业务逻辑如:从requestQueue中读取请求、数据存储到磁盘、把响应放进responseQueue中等等。M的大小由num.io.threads决定。

Reactor模型通常基于IO多路复用(如selectepoll),是非阻塞的,因此少许的线程能处理大量的链接。
若是大量的链接都是idle的,那么Reactor使用epoll的效率是杠杠的,若是大量的链接都是活跃的,此时若是没有Proactor的支持就最好把epoll换成select或者poll
具体作法是-Djava.nio.channels.spi.SelectorProvidersun.nio.ch包下面的EPollSelectorProvider换成PollSelectorProvider

QPS

是指系统每秒能处理的请求数量。QPS一般能够体现吞吐量(该术语很广,能够用TPS/QPS、PV、UV、业务数/小时等单位体现)的大小。

kafka中因为能够采用 batch 的方式(还能够压缩),因此每秒钟能够处理的请求不少(由于减小了解析量网络往复次数磁盘IO次数等)。另外一方面,kafka每个topic都有多个partition,因此同一个topic下能够并行(注意不是并发哟)服务多个生产者和消费者,这也提升了吞吐量。

平均响应时间

平均响应时间是指每一个请求得到响应须要的等待时间。

kafka中处理请求的瓶颈(也就是最影响响应时间的因素)最有可能出如今哪些地方呢?
网络? 有可能,可是这个因素整体而言不是kafka能控制的,kafka能够对消息进行编码压缩并批量提交,减小带宽占用;
磁盘? 颇有可能,因此kafka从分利用OS的pagecache,而且对磁盘采用顺序写,这样能大大提高磁盘的写入速度。同时kafka还使用了零拷贝技术,把普通的拷贝过程:disk->read buffer->app buffer->socket buffer->NIC buffer 中,read buffer到app buffer的拷贝过程省略了(因此上下文切换也减小了),加快了处理速度。这个功能依赖于 javatransferTo,底层由 linuxsendfile系统调用实现。在 linux2.4及以上 中,数据能够直接从 read buffer 拷贝到 NIC buffer ,达到了最短拷贝路径。
此外还有文件分段技术,每一个partition都分为多个segment,避免了大文件操做的同时提升了并行度。
CPU? 不大可能,由于消息队列的使用并不涉及大量的计算,常见消耗有线程切换、编解码、压缩解压、内存拷贝等,这些在大数据处理中通常不是瓶颈。

并发数

是指系统同时能处理的请求数量数。通常而言,QPS = 并发数/平均响应时间 或者说 并发数 = QPS*平均响应时间

这个参数通常只能估计或者计算,无法直接测。顾名思义,机器性能越好固然并发数越高咯。此外注意用上多线程技术而且提升代码的并行度、优化IO模型、减小减小内存分配和释放等手段都是能够提升并发数的。

扩展性

消息系统的可扩展性是指要为系统组件添加的新的成员的时候比较容易。

kafka中扩展性的基石就是topic采用的partition机制。第一,Kafka容许Partitioncluster中的Broker之间移动,以此来解决数据倾斜问题。第二,支持自定义的Partition算法,好比你能够将同一个Key的全部消息都路由到同一个Partition上去(来得到顺序)。第三,partition的全部replica经过ZooKeeper来进行集群管理,能够动态增减副本。第四,partition也支持动态增减。

对于producer,不存在扩展问题,只要broker还够你链接就行。
对于consumer,一个consumer group中的consumer能够增减,可是最好不要超过一个topicpartition数量,由于多余的consumer并不能提高处理速度,一个partition在同一时刻只能被一个consumer group中的一个consumer消费

代码上的可扩展性就属于设计模式的领域了,这里不谈。

参考

《kafka技术内幕》
Kafka的存储机制以及可靠性
Kafka 0.11.0.0 是如何实现 Exactly-once 语义的

查看原文,来自mageekchiu。总结不到位的地方请不吝赐教。

相关文章
相关标签/搜索