以前说过学习一个新的东西,最核心的就是掌握概念。而如何掌握概念呢?个人其中一个方法就是对比,把类似且模糊不清的两个概念进行对比,这样就理解更快。html
RabbitMQ有如下模式:
1.工做队列(Worke Queues)
发消息和收消息都是直接经过队列。在耗时比较多的任务,咱们把任务放入队列里,而后每一个工做者去获取任务而后处理。因此这个工做队列,也称为任务队列(Task Queues)。这样就将耗资源的任务从产生任务的应用上解耦出来。
这个模式最主要的特征是:每一个任务只会分发到一个工做者中。 java
2.发布/订阅(Publish/Subscribe)
这个发布/订阅和观察者模式很像,但不是同一个东西。具体可看看发布/订阅和观察者区别。
在这里,RabbitMQ引入了交换器(Exchange)的概念,生产者不直接与队列交互,而是经过交换器去与队列进行交互(或者叫绑定)。也就说生产者只和交换器交互。引入交换器这概念后,这消息中间件能够玩的花样就多了。发布/订阅(Publish/Subscribe)就是其中的一个。这里使用到的就是fanout的交换器。
这个模式最主要的特征是:相似于广播(broadcast),同个消息能够发送到不一样的队列中去,并且这fanout交换器也不关系队列有哪些,只要队列和fanout交换器有绑定就发送,这样就能够将消息重复发送到不一样的队列上。
与工做队列模式的区别是:发布/订阅的概念叫消息,而不是任务。因此消息能够重复的放入不一样的队列中。 node
3.路由(Routing)
路由模式也是引入交换器概念后,消息中间件玩的一个花样。这里用到的交换器叫direct。
在这模式里,得新增两个概念,分别是binding key和routing key, binding key是对于队列来讲的,在其与direct交换器绑定时指定binding key。而routing key是对于消息来讲的,在其发送消息到direct交换器时,需指定routing key。这样routing key可以和binding key匹配得上的(就是值相等),direct交换器就会将消息发送到对应binding key的队列上。
这个模式最主要的特征是:控制消息的精度更高,能够指定哪些消息发送到哪些队列里。
与发布/订阅模式的区别是:区别是发布/订阅是广播,将消息发送到任何绑定交换器的队列上,因此没能力选择消息,而路由是需binding key和routing key匹配上,消息才能发送到对应binding key的队列上,从而有能力去选择消息。
与发布/订阅模式的相同点是:能够将消息重复发送。
注:队列能够绑定多个routing key linux
4.主题(Topics)
固然,主题模式也是引入交换器概念后,消息中间件玩的一个花样。这里用到的交换器叫topic。
这里用到的也是binding key和routing key,但不同的是,routing_key不能指定明确的key。而是这个key须要带有点“.”,如 "stock.usd.nyse", "nyse.vmw", "quick.orange.rabbit"。而在这模式下,binding key的指定能够更普遍些,其结构是这样的".orange." 、 "..rabbit" 和"lazy.#"。其中*(星号)是能够表明一个单词,#(井号)是能够表明零个或多个单词。也跟路由相似的,只要这样routing key可以和binding key匹配得上的(这里能够不用值相等,模式匹配上便可),topic交换器就会将消息发送到对应binding key的队列上。面试
如Q1队列的binding key是" .orange.",而 Q2是" ..rabbit"和"lazy.#"。若是消息的routing key是 "quick.orange.rabbit" 则此消息会被发送到Q1和Q2队列上。routing key是"quick.orange.fox"的消息只会发送到Q1队列上。routing key是"lazy.pink.rabbit" 的消息只会发送到Q2队列一次,routing key是 "quick.brown.fox" 的消息没有匹配任何的binding key则此消息丢弃。
注:队列能够绑定多个routing key
5.远程过程调用RPC(Remote Procedure Call)
RPC能够远程调用函数,等待服务器返回结果。算法
RPC的一个备注:RPC虽然用得很普遍,然而它也有不足之处,就是开发人员没法清晰的知道本身调用的这个函数究竟是本地函数仍是很慢的RPC。这种困惑很容易致使出一个不可预测的系统和增长不必的复杂性致使难以定位问题。若是不用简单的程序,误用RPC还可能写出很维护的意大利面条式的代码。。
对于这个问题,有三个建议数据库
- 保证函数是很容易被辨别出是本地函数仍是远程函数。
- 文档化,清晰地记录组件间的依赖。
- 处理网络带来的异常,如超时等。
当出现用RPC是否必要时,若是能够的话,你最好用异步管道(asynchronous pipeline)的形式,而不是使用阻塞形式的RPC。
。apache
RabbitMQ能够用于构建RPC系统。一个客户端和一个可扩展的RPC服务器。不过此功能不太经常使用,因此就不留篇幅来说解。大概原理就是能够新增消息的属性,从而将请求和响应的消息给匹配上。设计模式
观察者模式
观察者模式的定义:对象间的一种一对多的组合关系,以便一个对象的状态发生变化时,全部依赖于它的对象都获得通知。
举个例子api
假设你正在找一份软件工程师的工做,对“香蕉公司”很感兴趣。因此你联系了他们的HR,给了他你的联系电话。他保证若是有任何职位空缺都会通知你。这里还有几个候选人也你同样很感兴趣。因此职位空缺你们都会知道,若是你回应了他们的通知,他们就会联系你面试。
该模式必须包含两个角色:观察者和观察对象,香蕉公司就是被观察者Subject,你就是Observers(还有和你同样的候选人),当被观察者状态发送变化(好比职位空缺)就会通知(notify)观察者,前提是Observers注册到Subject里,也就是香蕉公司的HR得有你的电话号码。
发布/订阅模式
在观察者模式中的Subject就像一个发布者(Publisher),而观察者(Observer)彻底能够看做一个订阅者(Subscriber)。subject通知观察者时,就像一个发布者通知他的订阅者。这也就是为何不少书和文章使用“发布-订阅”概念来解释观察者设计模式。可是这里还有另一个流行的模式叫作发布-订阅设计模式。它的概念和观察者模式很是相似。最大的区别是:
在发布-订阅模式,消息的发送方,叫作发布者(publishers),消息不会直接发送给特定的接收者(订阅者)。
意思就是发布者和订阅者不知道对方的存在。须要一个第三方组件,叫作消息中间件,它将订阅者和发布者串联起来,它过滤和分配全部输入的消息。换句话说,发布/订阅模式用来处理不一样系统组件的信息交流,即便这些组件不知道对方的存在。
咱们设计kafka,是但愿它能成为统一的平台来处理大公司可能拥有的全部实时数据流。要作到这一点,咱们必须考虑至关广的用例(use case)。
咱们想它是分区、分布式、实时处理信息流,以建立新的信息流和传输信息流。这些动机造就了kafka的分区和消费者模型。
最后有可能数据流被输入到其余数据系统中,而这些系统须要对外提供服务,因此kafka须要有能力保证容错性,哪怕存在有机器宕机。
为了支持上述这些,咱们设计了一些独特元素,更相似于数据库日志,而不是传统的消息传递系统。
咱们将在下面部分中概述设计中的一些元素。
kafka重度依赖文件系统,用文件系统来存储和缓存消息。人们都由这感受“硬盘很慢”,以至于你们怀疑一个持久化架构是否能具备竞争力的性能。实际上硬盘它很快也很慢,这取决于咱们怎么去使用它。一个合理的硬盘架构一般能够和网络同样快。(看来做者的网速都很快)。
硬盘性能的关键是,磁盘驱动器的吞吐量与过去十年的硬盘搜索的延迟有所不一样。所以在6×7200rpm SATA RAID-5阵列的JBOD配置上的线性写的性能大约为600MB/秒,但随机写入的性能仅为100k/秒,即超过6000倍的差异。这些线性读写是全部使用模式中最可预测的,而且由操做系统进行了大量优化。现代操做系统都提供了预读取(read-ahead)和后写(write-behind)操做的技术,这些支持屡次读取到一个大块中和合并小的逻辑写造成一个大的物理写。这问题更深刻的讨论能够在这找到 ACM Queue article,他们确实发现顺序硬盘读写在某些状况下比随机内存访问还快。
为了弥补这些性能差别,现代操做系统愈来愈着重使用主存来作磁盘缓存。现代操做系统很乐意将空余内存转移到磁盘缓存中,但这须要承受在内存被回收时带来的一点点的性能损失。全部硬盘读写都经过这统一的缓存(磁盘缓存)。若是没有直接IO,这特性并无那么容易被抛弃。所以即便一个进场维护本身数据缓存时,这些数据将会在OS的页缓存里复制两份,两次高效地存储全部东西。
此外,咱们是在JVM基础上创建的,任何一位有花时间去研究Java内存的使用,都会知道如下两件事情:
1.对象的内存开销很是高,一般会使要存储的数据的大小增大一倍(甚至更多)。
2.随着堆内存的增长,Java垃圾收集会变得愈来愈繁琐和缓慢。
也正是使用文件系统和依赖页缓存(pagecache)带来的结果优于维护一个内存中的缓存(in-memory cache)或是其余结构,经过对全部空闲内存进行自动访问,咱们至少能够将可用缓存加倍,而且还能够继续加倍,经过存储紧凑的字节结构而不是单个对象。这样作的话能够在32GB的机器上使用28-30GB缓存,而不用担忧GC问题。并且,即便服务重启,这些数据也保持热度,对比起来,进程内存中的缓存在重启后须要重建(对于10GB的缓存可能须要10分钟),不然它须要从一个彻底冷的缓存开始(这可能意味更糟糕的初始化性能)。这也极大地简化了代码,由于在缓存和文件系统之间保持一致性的全部逻辑如今都在操做系统中,这比一次性在进程内尝试更有效、更正确。若是您的磁盘使用倾向于线性读取,那么预读取将有效地预操做这些缓存。
这代表了一个很是简单的设计:在咱们耗尽空间的时候,与其保持尽量多的内存并将其所有清空到文件系统,不如反过来,数据都是被当即写入到文件系统上的持久日志中,而没必要刷新到磁盘。实际上,这仅仅意味着它被转移到内核的页缓存中。
以页缓存为核心的设计,在这里文章里有被描述,此文章是Varnish的设计。
在消息传递系统里的持久化数据结构一般是一个消费者队列关联着一棵BTree或者其余通用的随机访问数据结构来维护消息的元数据。BTree是一个万能的数据结构,能够在消息传递系统中支持各类事务和非事务性的语义。但它带来至关高的成本:BTree操做是O(log N)。一般O(log N)本质上被认为是等于常量时间,但对于硬盘操做则并非这样。磁盘寻轨达到10ms,而且每一个磁盘一次只能执行一次寻轨,因此并行性是有限的。所以,即便是少许的磁盘寻轨也会致使很高的开销。因为存储系统将很是快的缓存操做与很是慢的物理磁盘操做混合在一块儿,所以当在缓存固定时,数据增长时,树结构的性能一般是超线性的。数据加倍则会使速度慢两倍以上。
直观上,一个持久的队列能够创建在简单的读取和追加的形式,这一般也是日志解决方案使用的。这结构有这样的好处,全部操做都是O(1),而且读操做不会阻塞写和读的操做。这是具备明显的优点,是由于性能彻底与数据量大小解耦了,一个服务如今能够充分利用那些大量的,且便宜,低转速的SATA驱动器。虽然硬盘的寻轨性能差,但它们的大型读和写的性能仍是能够接受的,并且仍是三分之一的价格就有三倍的容量。
在没有任何性能惩罚的状况下访问几乎无限的磁盘空间意味着咱们能够提供一些在消息传递系统中不常见的特性。例如,在kafka中,咱们能够在相对较长的时间内保留消息(好比一个星期),而不是每次消费完就删除消息。这将给消费者带来很大的灵活性。
咱们在效率方面付出大量的努力。咱们最初用例中的一个是处理网站活动数据,这能够是很是大量的数据:每一个页面的访问都会产生许多写操做。此外,咱们假设每条消息至少被一个消费者读取(一般是不少消费者),所以咱们努力让消费尽量的便宜。
咱们还发现,经历过构建和运行多个相似的系统,有效的多租户业务的关键是效率。
咱们在前面章节讨论过硬盘的效率。一旦消除了糟糕的磁盘访问模式,在这种类型的系统中有两个常见的低效缘由:太多小的I/O操做和过分的字节复制。
这小IO问题发生在客户端和服务器之间,和服务器自身的持久化操做中。
为了不这种状况,咱们的协议是围绕一个“消息集(message set)”抽象构建的,该抽象能够天然地将消息分组在一块儿。这容许网络请求将消息分组,并分摊网络往返的开销,而不是一次发送一条消息。服务器依次将大量的消息追加到其日志中,而消费者一次获取大量的线性块。
这个简单的优化产生数量级的加速。批处理致使了更大的网络数据包、更大的顺序磁盘操做、连续的内存块等等,全部这些都使得Kafka能够将随机消息写入的流变成 线性的写 流给消费者。
另外一个低效率的是字节复制。在低消息率下,这不是一个问题,但在负载下的影响是显著的。为了不这种状况,咱们采用了一种标准化的二进制消息格式,由生产者、代理和消费者共享(所以数据块能够在不进行修改的状况下传输)。
broker维护的消息日志自己就是一个文件目录,每一个文件都由一个以生产者和消费者使用的相同格式写入磁盘的消息集的序列填充。保持这种通用格式能够优化最重要的操做:持久日志块的网络传输。现代unix操做系统为将数据从页缓存传输到套接字提供了高度优化的代码路径;在Linux中,这是经过sendfile的系统调用完成的。
要了解sendfile的做用,首先最重要先理解将数据从文件传输到套接字的公共数据路径:
1.操做系统从磁盘读取数据到内核空间的页缓存。
2.应用程序将数据从内核空间读取到用户空间缓冲区中。
3.应用程序将数据返回到内核空间,并将其写入套接字缓冲区。
4.操做系统将数据从套接字缓冲区复制到经过网络发送的NIC缓冲区。
有4次复制,两次系统内核调用,这样的效率固然就低下。使用sendfile,经过容许操做系统直接将数据从页缓存发送到网络,避免了重复复制。所以在这个优化的路径中,只须要最后的复制,一次从磁盘复制到NIC缓冲区便可。——零拷贝(zero-copy)
咱们指望一个常见的用例是在一个主题上有多个使用者。使用上述的零拷贝优化,数据被彻底复制到页缓存中,并在每次读取时重复使用,而不是存储在内存中并在每次读取时将其复制到用户空间。这就容许以接近网络链接的极限的速率来读取消息。
页缓存和sendfile的组合意味着,在一个Kafka集群上,在有消费者的机子上,您将看到磁盘上没有任何读取活动,由于它们将彻底从缓存中提供数据。
更多Java支持的sendfile和零拷贝,请点击这里。
在某性状况下,事实上真正的瓶颈不是CPU也不是硬盘,而是网络带宽。对于须要在广域网上的数据中心之间发送消息的数据管道来讲,尤为如此。固然,用户本身能够压缩消息而不须要kafka的支持。但这可能致使很是差的压缩比,特别是当消息的冗余字段不少(如JSON里的字段名和网站日志里的user agent或公共字符串)。高效的压缩须要多个消息压缩在一块儿,而不是每一个消息独立压缩。
Kafka用高效的批处理格式支持这一点。能够将一批消息聚合到一块儿压缩,并以这种形式发送到服务器。这批消息将以压缩的形式写入,而且将在日志中保持压缩,而且只会被使用者解压。
Kafka支持GZIP、Snappy和LZ4压缩协议。关于压缩的更多细节能够在这里找到。
生产者直接发送数据到broker,不须要任何的中间路由层,而接受的broker是该分区的leader。为了帮助生产者实现这一点,全部Kafka节点均可以回答关于哪些是可用服务器的元数据的请求,以及在任何给定的时间内,某个主题的分区的leader是否容许生产者适当地发送它的请求。
由客户端控制它想往哪一个分区生产消息。这能够随机地进行,实现一种随机的负载平衡,或者能够经过一些语义分区函数来实现。咱们提供了语义分区的接口,容许用户指定一个分区的key,并使用这个key来作hash到一个分区(若是须要的话,也是能够复写这分区功能的)。例如,咱们选择user的id做为可用,则因此该用户的信息都会发送到一样的分区。这反过来又会让消费者对他们的消费产生局部性的假设。这种明确设计的分区,容许消费者本身本地的处理。
批处理是效率的主要驱动因素之一,为了可以批处理,kafka的生产者会尝试在内存中积累数据,而后在一块儿在一个请求中以大批量的形式发送出去。批处理这个能够设置按固定的消息数量或按特定的延迟(64k或10ms)。这容许累积更多字节的发送出去,这样只是在服务器上作少许的大IO操做。这种缓冲是可配置的,这样提供了一种机制来以额外的延迟来提升吞吐量。
具体的配置)和生产者的api能够在这文档中找到。
kafka消费者的工做方式是,向其想消费的分区的leader发送“fetch”请求。在每一个请求中消费者指定日志的偏移量,而后接受回一大块从偏移量开始的日志。所以,消费者对position有重要的控制权,若是须要,能够重置position来从新消费数据。
咱们首先考虑的一个问题是,消费者应该是从broker拉取消息,仍是应该是broker把消息推送给消费者。在这方面,kafka遵循了一种更传统的设计,大多数消息传递系统也会用的,那就是数据是从生产者push到broker,消费者是从broker拉取数据。一些日志集中系统,如Scribe和Apache Flume,遵循一个很是不一样的,基于推送的路径,将数据被推到下游。这两种方法都由利弊,在基于推送的系统,因为是broker得控制数据传输的速率,不一样消费者可能要不一样的速率。然而消费者通常的目的都是让消费者本身可以以最大的速度进行消费,但在基于push的系统,当消费速率低于生产效率时,消费者就不知道该怎么办好了(本质上就是一种拒绝服务攻击(DOS))。一个基于pull的系统就拥有很好的熟悉,消费者能够简单的调控速率。
基于pull的系统的另外一个优势是,它能够对发送给消费者的数据进行聚合的批处理。基于推送的系统必须选择当即发送请求或积累更多数据,而后在不知道下游用户是否可以当即处理它的状况下发送它。若是对低延迟进行调优,这将致使仅在传输结束时发送一条消息,最终将被缓冲,这是浪费。基于pull的设计解决了这个问题,由于用户老是在日志的当前位置(或者是一些可配置的最大大小)以后提取全部可用的消息。所以,咱们能够在不引入没必要要的延迟的状况下得到最佳的批处理。
基于pull的系统的缺点是,若是broker没数据,则消费者可能会不停的轮训。为了不这一点,咱们在pull请求上提供了参数,容许消费者在“长轮训”中阻塞,直到数据达到(而且能够选择等待,直到必定数量的本身能够,确保传输的大小)。
你可能详细其余可能的设计,如只有pull,点到点。生产者会将本地的日志写到本地日志中,而broker则会从这些日志中拉取数据。一般还会提出相似的“存储转发(store-and-forward)”生产者。这颇有趣,可是咱们以为不太适合咱们的目标用例:它有成千上万的生产者。咱们在大规模上运行持久数据系统的经验使咱们以为,在许多应用程序中涉及到数千个磁盘,实际上并不会使事情变得更可靠,并且操做起来也会是一场噩梦。在实践中,咱们发现,咱们能够在不须要生产者持久化的状况下,以大规模的SLAs来运行管道。
使人惊讶的是,跟踪所使用的内容是消息传递系统的关键性能点之一。
不少消息传递系统在broker中保存了关于什么消息是被消费了的元数据。也就是说,当消息传递给消费者时,broker要么当即记录信息到本地,要么就是等待消费者的确认。这是一个至关直观的选择,并且对于一台机器服务器来讲,很清楚地知道这些消息的状态。因为许多消息传递系统中用于存储的数据结构都很糟糕,所以这(记录消息状态)也是一个实用的选择——由于broker知道什么是已经被消费的,因此能够当即删除它,保持数据的大小。
让broker和消费者就已经消费的东西达成一致,这可不是小问题。若是一条消息发送到网络上,broker就把它置为已消费,但消费者可能处理这条消息失败了(或许是消费者挂了,也或许是请求超时等),这条消息就会丢失了。为了解决这个问题,不少消息传递系统增长了确认机制。当消息被发送时,是被标志为已发送,而不是已消费;这是broker等待消费者发来特定的确认信息,则将消息置为已消费。这个策略虽然解决了消息丢失的问题,但却带来了新的问题。第一,若是消费者在发送确认信息以前,在处理完消息以后,消费者挂了,则会致使此消息会被处理两次。第二个问题是关于性能,broker必须保存每一个消息的不一样状态(首先先锁住消息以至于不会让它发送第二次,其次标志位已消费从而能够删除它)。还有些棘手的问题要处理。如消息被发送出去,但其确认信息一直没返回。
kafka处理则不同。咱们的主题被分为一个有序分区的集合,且每一个分区在任何给定的时间内只会被订阅它的消费者组中的一个消费者给使用。这意味着每一个分区中的消费者的position仅仅是一个整数,这是下一次消费时,消息的偏移量。这使状态(记录是否被消费)很是小,每一个分区只有一个数字。这个状态能够被按期检查。这样确认一条消息是否被消费的成本就很低。
这样还附加了一个好处。消费者能够重置其最早的position从而从新消费数据。这虽然违反了队列的公共契约,但它却变成关键功能给许多消费者。例如,若是消费者代码有一个bug,而且在一些消息被消费后才被发现,那么当bug被修复后,消费者就能够从新使用这些消息。
可扩展持久化容许只有周期性地使用批量数据的消费者的可能性,好比按期将批量数据加载到离线系统(如Hadoop或关系数据仓库)。
如今咱们已经了解了些生产者和消费者是怎么工做的,接下来咱们说下kafka提供给生产者和消费者的语义保证。很明显这里提供了如下几种消息传递保证机制:
值得注意的是,这能够归结为两个问题:发布消息的持久化保证,以及在消费消息时的保证。
不少系统声称提供“有且仅有一次”的传递语义,但阅读这些细节时,会发现其中大部分都是误导(他们不理解消费者或生产者可能挂掉的状况,那些有多个消费者处理的状况,或者是那些被写入磁盘的数据可能丢失的状况)。
kafka的语义很直接。在发布消息时,咱们将消息“提交”到log中。一旦发布的消息被提交,只要有一个broker复制这个消息被写入活动分区,它就不会丢失。提交的消息的定义、活动分区以及咱们试图处理的失败的类型的描述将在下一节(副本)中详细描述。如今咱们假设在完美的状况下,如今让咱们假设一个完美的、无损的broker,和尝试理解对生产者和消费者的保证。若是一个生产者试图发布消息并经历一个网络错误,那么就不能肯定该错误发生在消息提交以前仍是以后。这相似于插入到一个数据库表的自动生成的主键的语义。
在0.11.0.0版本以前,若是一个生产者没有收到一个消息已经提交的响应,那么它几乎没有选择,只能从新发送消息。这提供了“至少一次”的传递语义,由于若是原始请求实际上成功了,那么在从新发送期间,消息可能再次被写入到日志中。从0.11.0.0开始,Kafka生产者也支持一个幂传递的选项,该选项保证从新发送不会致使日志中有这重复的消息。为了实现这一目标,broker为每一个生产者分配一个ID,并使用由生产者发送消息时一块儿把序列号发送到broker,这样broker就能够根据序列和id来处理重复的消息。一样,从0.11.0.0开始,生产者支持使用相似于事务的语义向多个主题分区发送消息:即全部消息都已成功写入或都失败写入。这种状况的主要应用场景是在Kafka主题之间进行“有且仅有一次”的处理(以下所述)。
并不是全部的用例都须要这样强的保证。对于延迟敏感的使用,咱们容许生产者指定它须要的持久化级别。若是生产者指定要等待消息被提交要在10ms完成。则生产者能够指定它异步地执行发送,或者等待直到leader(但不必定是follower)获得消息。
如今咱们描述下消费者视角下的语义。全部的副本都有相同的日志和相同的偏移量。消费者控制它在这个日志中的position。若是消费者从未崩溃,它能够将这个position存储在内存中,可是若是消费者崩溃了,咱们但愿这个主题的分区来接替这个position的处理,那么新的进程将须要选择一个合适的position来开始处理。
消费者读取消息时,有几个处理消息和更新其位置的选项。
那“有且仅有一次”的语义怎样(或者是说你到底想要什么)?从kafka主题中获取消息处理后发布到其余主题(如一个Kafka Streams应用),咱们能够利用上面提到的版本0.11.0.0里的新事务生产者的功能。消费者的position被当作一个消息存储在一个主题,所以咱们能够在与接收处理数据的输出主题相同的事务中写入kafka的偏移量。 若是事务被停止,消费者的position将恢复到原来的值,而输出主题的生成数据将不会被其余消费者看到,这取决于他们的“隔离级别”。在默认的“read_uncommitted”隔离级别中,全部消息对消费者都是可见的,即便它们是被停止的事务的一部分,可是在“read_committed”中,使用者只会从提交的事务中返回消息(以及任何不属于事务的消息)。
当写入外部系统时,限制是在须要协调消费者的position和实际存储的输出。实现这一目标的经典方法是在存储消费者position和存储消费者输出之间引入两阶段提交。但这能够更简单地处理,而且一般经过让消费者将其偏移量存储在与输出相同的位置。这样作比较好,由于消费者可能想要写入的输出系统都不支持两阶段提交。做为一个例子,考虑一个Kafka Connect链接器,它在HDFS中填充数据,以及它读取的数据的偏移量,从而保证数据和偏移量都获得了更新,或者二者都不更新。对于须要这些更强语义的其余许多数据系统,咱们遵循相似的模式是为了那些须要强一致性语义的系统,还为了这些消息没有主键来容许删除重复数据。
所以kafka为了kafka Streams,高效地支持“有且仅有一次”的传递,而且在Kafka主题之间传输和处理数据时,一般能够使用事务生产者/消费者提供“有且仅有一次”的传递。对于其余目标系统的“有且仅有一次”的传递通常须要协调,但kafka提供了偏移量,它能够实现这要求(参见Kafka Connect)。不然,缺省状况下Kafka保证“至少一次”传递,而且容许用户禁止生产者的重试或消费者在处理数据以前提交position,从而实现“至多一次”的专递。
Kafka经过一个可配置的服务器数量对每一个主题的分区进行复制日志(你您能够按主题设置此副本因子(replication factor))。这容许在集群中的服务器发生故障时自动恢复,所以当在出现故障时仍然能够使用消息。
其余消息传递系统提供了副本相关的特性,但,咱们认为,这彷佛是一种策略而已,并无大量的使用,并且还有个很大的缺点:slave是未被用上的,吞吐量受到严重的影响,恢复还须要繁琐的人工配置,等等。kafka默认是使用了副本功能,实际上那些副本因子设置为1的主题,咱们也会当作是使用副本功能的主题。
副本的最小单元是主题的分区。在没有失败的状况下,kafka的每一个分区都是有一个leader,其follower能够为零个或多个。包括leader在内的副本数量就是副本因子。全部读和写都是经过leader分区。一般状况,分区的数据量是多个broker,leader的数量时平均分配当每一个broker。follower的日志和leader的日志是彻底相同的——它们都具备相同的偏移量和相同顺序的消息(固然,在任何给定的时刻,在日志的末尾可能会有一些还未同步到的消息)。
follower也跟kafka的普通消费者同样从leader消费消息。follower从leader拉消息时,有个很好的特性,那就时可让follower很容易地批量把日志应用到其(follower)日志中。
跟不少 分布式系统处理自动恢复 同样,对于节点是否“存活(alive)”须要有一个明确的定义。对于kafka,节点存活有如下两个条件:
1.节点必须维护它与ZooKeeper的session(经过ZooKeeper的心跳机制)
2.若是是slave,就必须复制leader,并且不能落后太远。
知足上述两个条件的节点,咱们更愿意叫“已同步(in sync)”而不是模糊不清的“存活”或“失败”。leader保持跟踪这些“已同步”的节点。若是follower挂了,或者卡住了,或者落后太远了,leader会讲起从已同步的副本名单中移除。是有e replica.lag.time.max.ms这配置去控制卡住多长时间和落后多少副本数量。
在分布式系统术语中,咱们只尝试处理一个“失败/恢复”模型,即节点忽然中止工做,而后恢复(可能不知道它们已经死亡)。kafka没有处理所谓的“拜占庭式”的失败,即节点产生任意或恶意的响应(多是因为某些错误)。
如今,咱们能够更精确地定义一个消息的提交,当全部副本都同步到分区,分区而且应用到其日志中时,就会被认为是提交的。只有提交的消息才会分发给消费者。这就意味着消费者不用担忧当leader崩溃时,消息会丢失。另外一方面,生产者能够选择等待消息提交或不提交,这取决与它们对延迟和持久化之间的权衡。生产者能够使用acks这配置来控制这权衡。注意,这“最小数据量(minimun number)”同步副本的数量设置,是指当消息都同步到全部副本后,kafka再去检查时,检查的最小数量。若是生产者对确认要求不太严格,则消息一发布就能够被使用了,即便同步副本数量还没达到最小值。(这最小值能够低到只有一个,那就是leader)。
kafka保证消息不会丢,只要任什么时候候至少有一个已同步的副本存在。
kafka能够在节点故障的状况下可用。但存在网络分区时,就可能没法使用了。
分区就是一个副本日志。副本日志是分布式数据系统的最基本的原语(primitvie)之一,并且有不少种实现方式。其余系统能够使用副本日志做为一种原语,用于在状态机形式的分布式系统。
对于一系列值的顺序达成一致的过程(一般编号为0、一、二、…),副本日志就是将其模型化。有不少方法能够实现这一点,但最简单和最快的是leader来选择序值。只有leader还存活,所哟follower都只须要复制值便可,顺序由leader决定。
固然,若是leader不挂,那咱们不必要follower。当leader崩溃时,我从follower中选择出新的leader。但follower本身可能落后或崩溃,因此咱们必须保证咱们选择的是最新的follower。日志复制算法必须这最基本的保证时,若是咱们告诉客户端消息已经提交了,而此时leader挂了,咱们选择的新leader也必须包含刚刚那个已经提交了的消息。这就产生了一个权衡:若是leader等待过多的follower确认消息,This yields a tradeoff: if the leader waits for more followers to acknowledge a message before declaring it committed then there will be more potentially electable leaders.
若是你指定确认的数量和日志(与leader对比过的)的数量,这样就保证有重叠性,那么这就叫法定人数(Quorums)。
这种权衡最多见的方法是,在提交决策和leader选举中使用大多数投票。这不是kafka作的,但让咱们去探索它,了解它的利弊。假设咱们有2f+1个副本。若是f+1节点收到消息,没有超过f个节点失败,则leader就保证全部消息都被提交,咱们选择新leader时也同样。这是由于咱们在任意节点上选择f+1个节点,这f+1里必须至少有一个节点包含全部已提交消息的副本。副本最完整的结点将会被选中为新leader。这里还有不少算法细节须要处理(如明肯定义日志的完整性,leader崩溃时怎么保证一致性,修改集群中的服务器),这些咱们先暂时忽略。
多数投票方法有个很是好的特性:延迟仅仅取决于多台最快的服务器。也就是说,若是副本因子时3,那么延迟由最快的一个slave决定,而不是最慢的slave(leader一个、最快的slave一个,这就达到法定人数了)。
这个家族有不少算法,包括ZooKeeper的Zab,Raft和Viewstamped副本算法。咱们知道的,更接近kafka的用的算法的学术出版是来自微软的PacificA。
多数投票的不足之处就是,它不须要不少失败的节点,就可让你选择不到leader。为了容忍一个节点失败,则须要3个节点,容忍2个,则须要5个节点。在咱们经验里,觉得只要刚恰好够冗余的副本,就能容忍一个节点的失败,但这是不实际的,在5倍硬盘空间(5个硬盘,每一个硬盘占1/5吞吐)状况下,每次都要写5次,这对于大量数据的问题时不切实际的。这也是为何法定人数算法比较经常使用在集群的配置文件如ZooKeeper,而不多用在原数据存储上。例如在HDFS的namenode的高可用是创建在多数人投票,但这成本很高的算法不会用在它的数据存储上。
Kafka使用了一个稍微不太同样的方法去选择法定人数。kafka动态的维护一个ISR(in-sync replicas)集合,集合里面的节点都是已同步。只有这集合里面的人才适合选举为leader。只有全部ISR都收到写入分区,则这分区的写入就会被认为已提交。这ISR保存在ZooKeeper。对于kafka的使用模型来讲,这是一个重要的因素,那里有许多分区,而且确保leader的平衡很重要。ISR模型和f+1副本,一个kafka主题能够容忍f个失败(总共就f+1个节点)。
咱们想处理更多的用例,因此这个权衡咱们以为是合理的。在实际状况,对于容忍f个节点失败,多数投票和ISR方法都是须要通用数量的副本确认(好比,容忍1个节点失败,多数投票方法则须要3个副本和1个确认,ISR方法须要2个副本和1个确认)。确认提交而不须要由最慢的节点来确认这是多数投票方法的好处。但咱们以为这是能够经过由客户端选择是否阻塞消息提交,以及控制副本因子(下降)而增长吞吐量和磁盘空间来优化这个问题(这问题就是与多数投票对比)。
另外一个重要的设计是,kafka不要求崩溃节点在全部数据完整的状况下恢复。在这个空间中,副本算法依赖于“稳定存储”的存在并很多见,这种“稳定存储”在任何故障恢复场景中都不能丢失,要保证一致性。这有两个主要问题。首先,硬盘故障是咱们在持久化数据系统的实际操做中最多见的问题,问题发生后,一般也不会完整地保留数据。其次,即便这不是一个问题,咱们也不但愿在每次写入时都须要使用fsync,由于这样会减小两到三个数量级的性能。咱们容许一个副本从新加入ISR的协议,这协议确保在从新加入以前,它必须彻底从新同步,即便它在崩溃中丢失了未刷新的数据。
注意,Kafka对数据不丢失的保证是基于至少一个保持同步的副本。若是一个分区的副本都丢失了,则没法保证数据不丢失。
然而在实际状况下的系统当全部副本挂以后必须作一些合理的事情。若是很不辛遇到这种状况,意识到后面会发生什么这是很重要。可能会出现如下两种状况:
1.等待ISR里的全部节点恢复,并选择出新的leader(但愿这leader还保存着全部的数据)。
2.选择第一个副本(不须要是ISR里面的)恢复,做为leader。
如下是可用性和持久化的权衡。1、若是咱们等待全部ISR副本恢复,则咱们会等很长的时间。。2、若是副本的数据都丢了,则永远没法恢复。最后一个就是,若是一个没有同步的副本恢复,咱们容许它为leader,则认为它的日志是最新的,哪怕它没有包含全部已提交的消息。在0.11.0.0版本里默认的选择第一个权衡,用等待来换取数据的一致性。这个是能够配置的,若是启动时间比一致性重要,则修改这个 unclean.leader.election.enable。
这个困惑不只仅kafka有。它存在与任何基于法定人数算法的场景。例如,在多数投票的场景,若是你是去大多数服务器,在剩余的服务器,你就必须在二者选其中一个,不是失去100%的数据就是丢失数据的一致性。
生产者生成消息时,能够选择0个,1个或者所有副本确认。注意这里的“所有副本确认”不能保证全部被分配副本的结点都能收到消息。默认的,当acks=all时,只要全部当前全部ISR都收到消息,则能够确认消息。例如,一个主题被设置为两个副本和一个失败(只有剩下一个ISR),而后全部acks=all的写入都会是成功的。若是剩余的副本也失败,这样消息就会被丢失。尽管这确保了分区的最大可用性,可是这种行为可能不适合某些喜欢持久化而不是可用性的用户。所以,咱们提供了两种顶级的配置,可用于更倾向于消息持久化而不是可用性:
1.关闭不清晰的leader选举——若是全部副本变得不可用,直到最近的leader变得可用,全部分区才能够变得能够用。这有效地避免了消息丢失的风险。请参阅上一节不清晰的Leader选举。
2.指定最小的ISR数量——只有高过这最小数量,消息才会被确认,这是为了不在写入一个副本时,并且副本挂了,致使消息丢失的风险。这个设置仅仅在生产者使用acks=all生效或保证消息在这数量以上的ISR确认。这个设置提供了一致性和高可用的权衡。ISR最小数量设置高一点,这样更好的保证一致性。然而这样会减小可用性,由于在ISR没知足这数量时,分区是不可用的。
上诉讨论副本,也仅仅是一份日志,也就是主题的一个分区。然而kafka是管理成千上万的分区。咱们试图以循环(round-robin)方式在集群中平衡分区,以免在大数据量的主题的全部分区都在少许节点上。一样地,咱们试图平衡leader,使每一个节点都是其必定份额分区的leader。
对ledaer选举过程进行优化也很重要,由于这是服务不可用的窗口期。一个简单的leader选举会在一个节点失败后,在该节点内全部分区,每一个分区都会举行一次选举。相反,咱们选择一个broker做为“controller”。这controller检测broker层次的失败,负责修改受故障影响的分区的leader。其结果是,咱们可以将许多须要的leadr变动批量处理,这使得选举过程在大量的分区上变得更加便宜和快速。若是controller失败了,其中一个存活的节点会变成新的controller。
日志压缩保证kafka在每一个分区,对于每一个key,至少保存其最近的一条消息。这解决了那些须要当应用或系统崩溃后,重启时需从新加载数据的场景。
到目前位置,咱们只讨论了简单的数据保存方法,那就是当旧日志数据超过必定时间或达到必定大小的时候会被删除。这个适用于每条相对独立的消息,如临时事件。然而,还有一类很重要的数据,那就是根据key修改数据,一种可变的数据(例如在数据库表数据的变动那样)。
咱们讨论一个具体的例子。一个主题包含了用户emial信息,每次用户更新他们的email信息,咱们都会发送消息到topic,是根据他们的userid作主键。如下是咱们发送的消息,userid是123,每条信息都对应着一次的email信息修改(省略号是省略其余userid的消息)。
123 => bill@microsoft.com . . . 123 => bill@gatesfoundation.org . . . 123 => bill@gmail.com
日志压缩给了咱们更细颗粒度保留数据机制,这样咱们就能够保证只保留每个key最后的一次变动(如123 => bill@gmail.com)。这样咱们保证了日志里都包含了全部key的最后一个值的快照。这就意味着下游的消费者能够重建状态而不须要保存全部的更变日志。
让咱们一些日志压缩有用的场景,而后咱们在看看是怎么被使用上。
1.数据库变动订阅(Database chagne subscription)。咱们很常见到一份数据集会存在多种数据系统里,并且这系统里有一个相似数据库那样的(如RDBMS或新潮的key-value系统)。举个例子,你有一个数据库、一个缓存、一个搜索集群和一个Hadoop集群。这样每次数据库的修改,都得映射到那缓存、那搜索集群和最后在Hadoop里。在这个场景里,你只是须要实时最新更新的日志。但若是须要从新加载进缓存或恢复宕机的搜索节点,就可能须要完整的数据集。
2.事件源(Event sourcing)。这是一种应用设计风格,它将查询和应用设计结合在一块儿,并使用日志做为程序的主要存储。
3.高可用日志(Journaling for high-availability)。一个本地计算的进程能够经过变动日志来作到容错,这样另外一个进程就能从新加载这些变动继续处理。一个具体的例子就是流式查询系统,如计数、汇总和其余“分组”操做。实时流式处理框架Samza就是使用这功能达到目的的。
在上述场景中,主要处理实时的变动,偶尔须要从新加载或从新处理时,能作的就只有从新加载全部数据。日志压缩提供了这两个功能,处理实时数据变动,和从新加载数据。这种使用日志的风格,详情可参看点击。
这思路很简单。若是咱们保存无穷无尽的日志,保存上述场景中每一个变动日志,并且仍是一开始就获取每一个系统的状态。使用这个完整的日志,咱们就能够恢复到任何一个时间点的状态。但这种完整日志的假设时不切实际的,由于对于那些每一行记录都在变动屡次的系统,即便数据很小,日志也会无限的增加下去。那咱们就简单的丢弃旧日志,虽然能够限制空间的增加,但也没法重建状态——由于旧日志被丢弃,可能一部分记录的状态没法重建。
相对于粗粒度的基于时间的数据保留策略,日志压缩的策略是一种更细颗粒度,基于每一条记录保存。这个想法是,有选择性的删除那些有多个变动记录的一样的key。这样的日志就保证每一个key都至少有一个最新的状态。
数据保留策略能够为每一个主题设置,因此一个集群里有些主题的保存策略能够设置为大小和时间来保存数据,有主题也能够经过压缩保留。
这个功能的灵感是来自于LinkedIn里最古老且最成功的基础架构——一个被称为Databus的数据库变动日志缓存系统。
跟大多很多天志结构存储系统不同的时,Kafka是为了订阅而设计的,组织数据的形式也是为了更快的线性读取和写入。跟Databus不同之处是,kafka做为真实源(source-of-truth)存储,即便上游数据源不具有可重用性的状况下,它仍是挺有用的。
无论是传统的RDBMS仍是分布式的NoSQL存储在数据库中的数据老是会更新的,相同key的新记录更新数据的方式简单来讲有两种:
1.直接更新(找到数据库中的已有位置以最新的值替换旧的值)。
2.追加记录(保留旧的值,查询时再合并,或者也有一个后台线程会按期合并)。
采用追加记录的作法能够在节点崩溃时用于恢复数据,还有一个好处是写性能很高,由于是线性写。
如下是各个数据系统的更新数据方式:
数据系统 | 更新数据追加到哪里 | 数据文件 | 是否须要压缩 |
---|---|---|---|
ZooKeeper | log | snapshot | 不要,由于数据量不大 |
Redis | aof | rdb | 不须要,由于是内存数据库 |
Cassandra | commit log | data.db | 须要,数据存在本地文件 |
HBase | commit log | HFile | 须要,数据存在HDFS |
Kafka | commit log | commit log | 须要,数据存在分区中的Segment里 |
这里有个更高层次的图,展现kafka日志的逻辑存储结构,框框的每一个数字都是一条消息的偏移量(offset):
日志的头部(Log Head)就是传统的kafka日志。日志的尾部(Log Tail)则是被压缩过的日志。Log Head是很密集的,偏移量时连续的,保留了全部的消息。值得注意的是在Log Tail的消息虽然被压缩,但依然保留它一开始被写入时的偏移量,这个偏移量是永远不会被改变。并且这压缩日志里的偏移量,在日志里依然时有效的。因此,时没法区分下一个更高的偏移量是什么,好比说,上面的例子,3六、 3七、 38都是属于同一个位置。
以上说的都是数据更新时的日志压缩,固然日志压缩也支持删除。当发送某个Key的最新版本的消息的内容为null,这个Key将被删除(某种程度上也算是更新,如上面的例子就是把email信息置为null)。这个消息也称删除标志(delete marker),这个删除标志会把以前跟这key相同的消息删掉。但这删除标志比较特殊,特殊之处是它是过一段时间才被删除,从而腾出磁盘空间。而数据删除的时间点会被标志为“删除保留点(delte retention point)”,也就是如上图所示,这个图展现也很特别,你看看两个是point而不是pointer,也不是指向某个消息,而是消息与消息之间。说明它是个时间点,而不是指向某个消息的指针pointer。
压缩时经过后台按期复制日志段(log segment)完成的。清除时并不会阻塞读操做,并且还能够配置不超过必定的IO,从而避免影响消费者和生产者。压缩日志段的过程以下:
日志压缩保证:
1.任何消费者只要是读取日志的头部的,均可以看到全部消息,头部的消息不会被删除。这些消息都是有连续的偏移量。Topic的min.compaction.lag.ms参数可用于保证在指定时间内该消息的存在,而不会被压缩。这提供了消息呆在头部(未被压缩)的时间的底线。
2.依然保持则消息的有序性。压缩永远不会从新给消息排序,而仅仅是删除其部分而已。
3.消息的偏移量永远不会改变。它永远标志着消息所在的位置。
4.任何从日志最开始的地方开始处理都会至少看到每一个key的最终状态。另外,只要消费者在delete.retention.ms(默认是24小时)这时间内达到日志的头部,则将会看到全部删除记录的删除标志。也就是说:因为删除标志的移除和读取是同时发生,因此若是错过delete.retention.ms这时间,消费者会错过删除标志。
日志压缩经过日志清除器(log cleaner)执行,后台线程池复制日志段,移除那些存在于Log Head中的记录。每一个压缩线程工做以下:
1.选择Log Head中相对比Log Tail的比例高的日志。
2.建立Log Head中每一个Key对应的最后一个偏移量的日志摘要。
3.从头至尾的开始复制,在复制过程当中删除相同key的日志。新的、干净的日志段将马上被交换(swap)到日志里,因此只需一个额外的日志段大小的硬盘空间就能够(不须要所有日志的空间)。
4.Log Head的日志摘要其实是一个空间紧凑的哈希表。每一个实体只须要24个字节空间。因此8G的cleaner空间,能够处理大概366G的Log Head(假设每一个消息大小为1k)。
Kafka是默认启用日志清除器,是个线程池。若是要开启指定主题的清理功能,你能够在日志里添加如下属性:
log.cleanup.policy=compact
这个能够在建立主题时指定或修改主题时指定。
日志清除器能够设置多少消息在Log Head而不被删除。这个启用是经过设置压缩时间段:
log.cleaner.min.compaction.lag.ms
若是不设置,则默认是除了最后一个segment以外,其他日志段都会被压缩,即最后一个日志段不会被压缩。任何已激活的日志段都不会被压缩,就算消息的时间已经超过了上面配置的时间,这里的激活,是指有在消费。
Kafka集群有能力强制性地要求控制broker中客户端使用的资源。如下是两类客户的quotas:
1.网络带宽quotas,具体到字节(从0.9版本开始)。
2.请求速率quotas,具体到CPU的利用率(网络和IO的比值)。
生产者和消费者有可能生成/消费大量的数据或请求速率很是高,以至于占满了broker的资源,致使网络饱和broker拒绝给其余客户端服务。使用quotas就能避免这个问题,在多租户集群上尤其重要,由于一部分低质量的客户可能会下降高质量客户的用户体验。实际上,能够对API进行这样的限制。
Kafka客户标识是用户主体(user principal),用于表明用户在这安全的集群上的权限。在无鉴权的时候,broker经过可配置的PrincipalBuilder来提供用户主体,用来分组。由客户端应用选择client-id做为客户的逻辑分组。元组(user,client-id)则定义了一个安全逻辑组,共享user principal和chient-id。
quotas能够被应用到元组(user,client-id),user或client-id组。对于一个链接,匹配上的quota将会应用到此链接上。例如(user="test-user",client-id="test-client")拥有生产者quota是10MB/s,这个10MB的带宽将会被user是“test-user”而且client-id是"test-client"的生产者进行共用。
quota能够按(user,client-id)配置,也能够按user组配置,也能够按client-id组配置。默认quota能够被任何级别的quota给覆盖。这个机制相似于每一个Topic能够覆盖本身的。ZooKeeper的/config/users的quota能够覆盖user和(user,client-id)的quota。/config/clients下的则能够覆盖client-id的quota。这些ZooKeeper的覆盖会便可在因此broker中生效,这样咱们就不须要修改配置时重启服务器。详情请点击。
quota配置的优先级以下:
1./config/users/<user>/clients/<client-id> 2./config/users/<user>/clients/<default> 3./config/users/<user> 4./config/users/<default>/clients/<client-id> 5./config/users/<default>/clients/<default> 6./config/users/<default> 7./config/clients/<client-id> 8./config/clients/<default>
broker的(quota.producer.default, quota.consumer.default)属性来给每一个client-id组设置默认的网络带宽。但后面的版本会删除这些属性。
client-id组的默认quota能够在ZooKeeper中配置。
网络带宽quota,具体到字节,并且是有组里的客户一块儿共享。默认的,每一个独立的客户组都有一个固定的网络带宽的quota。这quota配置在每一个broker。
请求速率quota,具体到时间的百分比,时间是在quota窗口里每一个broker的处理请求的IO线程和网络线程。 n%的quota表明一个线程的n%,因此quota总数是((num.io.threads+num.network.threads)×100)%。每一个客户组在一个quota窗口中最多使用n%的IO线程和网络线程。因为分配给IO和网络的线程数是根据broker主机的cpu个数,则每一个请求速率quota表明着CPU的百分比。
默认状况下,每一个惟一的客户组都会有一个集群配置好的固定的quota。这个quota是定义在每一个broker上。咱们决定由每一个broer定义这些quota,而不是由集群为每一个client统一设置一个quota的缘由,是由于为了方便共享quota的设置。
若是Broker检测到超过quota了,会怎么处理?在咱们的解决方案中,咱们是选择下降速率,而不是直接返回错误。broker会去计算处理这问题的延迟时间,这段时间则不会马上响应客户端。这种超过quota的处理,对于客户端来讲是透明的。客户端不须要作额外的操做。实际上,客户端额外的动做,若是操做很差,还会加重超过quota的问题。
字节率和线程利用率都会在多个小窗口中监测(一秒钟有30个窗口),以便快速准确的纠正quota违规行为。
客户端字节率在多个小窗口(例如每一个1秒的30个窗口)上进行测量,以便快速检测和纠正配额违规。 一般,大的测量窗口(例如,每30秒10个窗口)会致使大量的流量,而后是长时间的延迟,这对用户体验方面并很差。
参考和翻译:
RabbitMQ官方 https://www.rabbitmq.com
Kafka官方 http://kafka.apache.org/docum...