消息队列已经逐渐成为企业IT系统内部通讯的核心手段。它具备低耦合、可靠投递、广播、流量控制、最终一致性等一系列功能,成为异步RPC的主要手段之一。前端
当今市面上有不少主流的消息中间件,如老牌的ActiveMQ、RabbitMQ,煊赫一时的Kafka,阿里巴巴自主开发的Notify、MetaQ、RocketMQ等。算法
本文不会一一介绍这些消息队列的全部特性,而是探讨一下自主开发设计一个消息队列时,你须要思考和设计的重要方面。过程当中咱们会参考这些成熟消息队列的不少重要思想。数据库
本文首先会阐述何时你须要一个消息队列,而后以Push模型为主,从零开始分析设计一个消息队列时须要考虑到的问题,如RPC、高可用、顺序和重复消息、可靠投递、消费关系解析等。编程
也会分析以Kafka为表明的pull模型所具有的优势。最后是一些高级主题,如用批量/异步提升性能、pull模型的系统设计理念、存储子系统的设计、流量控制的设计、公平调度的实现等。其中最后四个方面会放在下篇讲解。缓存
当你须要使用消息队列时,首先须要考虑它的必要性。可使用mq的场景有不少,最经常使用的几种,是作业务解耦/最终一致性/广播/错峰流控等。反之,若是须要强一致性,关注业务逻辑的处理结果,则RPC显得更为合适。性能优化
解耦是消息队列要解决的最本质问题。所谓解耦,简单点讲就是一个事务,只关心核心的流程。而须要依赖其余系统但不那么重要的事情,有通知便可,无需等待结果。换句话说,基于消息的模型,关心的是“通知”,而非“处理”。网络
好比在美团旅游,咱们有一个产品中心,产品中心上游对接的是主站、移动后台、旅游供应链等各个数据源;下游对接的是筛选系统、API系统等展现系统。当上游的数据发生变动的时候,若是不使用消息系统,势必要调用咱们的接口来更新数据,就特别依赖产品中心接口的稳定性和处理能力。但其实,做为旅游的产品中心,也许只有对于旅游自建供应链,产品中心更新成功才是他们关心的事情。而对于团购等外部系统,产品中心更新成功也好、失败也罢,并非他们的职责所在。他们只须要保证在信息变动的时候通知到咱们就行了。session
而咱们的下游,可能有更新索引、刷新缓存等一系列需求。对于产品中心来讲,这也不是咱们的职责所在。说白了,若是他们定时来拉取数据,也能保证数据的更新,只是实时性没有那么强。但使用接口方式去更新他们的数据,显然对于产品中心来讲太过于“重量级”了,只须要发布一个产品ID变动的通知,由下游系统来处理,可能更为合理。并发
再举一个例子,对于咱们的订单系统,订单最终支付成功以后可能须要给用户发送短信积分什么的,但其实这已经不是咱们系统的核心流程了。若是外部系统速度偏慢(好比短信网关速度很差),那么主流程的时间会加长不少,用户确定不但愿点击支付过好几分钟才看到结果。那么咱们只须要通知短信系统“咱们支付成功了”,不必定非要等待它处理完成。负载均衡
最终一致性指的是两个系统的状态保持一致,要么都成功,要么都失败。固然有个时间限制,理论上越快越好,但实际上在各类异常的状况下,可能会有必定延迟达到最终一致状态,但最后两个系统的状态是同样的。
业界有一些为“最终一致性”而生的消息队列,如Notify(阿里)、QMQ(去哪儿)等,其设计初衷,就是为了交易系统中的高可靠通知。
以一个银行的转帐过程来理解最终一致性,转帐的需求很简单,若是A系统扣钱成功,则B系统加钱必定成功。反之则一块儿回滚,像什么都没发生同样。
然而,这个过程当中存在不少可能的意外:
可见,想把这件看似简单的事真正作成,真的不那么容易。全部跨VM的一致性问题,从技术的角度讲通用的解决方案是:
最终一致性不是消息队列的必备特性,但确实能够依靠消息队列来作最终一致性的事情。另外,全部不保证100%不丢消息的消息队列,理论上没法实现最终一致性。好吧,应该说理论上的100%,排除系统严重故障和bug。像Kafka一类的设计,在设计层面上就有丢消息的可能(好比定时刷盘,若是掉电就会丢消息)。哪怕只丢千分之一的消息,业务也必须用其余的手段来保证结果正确。
消息队列的基本功能之一是进行广播。若是没有消息队列,每当一个新的业务方接入,咱们都要联调一次新接口。有了消息队列,咱们只须要关心消息是否送达了队列,至于谁但愿订阅,是下游的事情,无疑极大地减小了开发和联调的工做量。好比本文开始提到的产品中心发布产品变动的消息,以及景点库不少去重更新的消息,可能“关心”方有不少个,但产品中心和景点库只须要发布变动消息便可,谁关心谁接入。
试想上下游对于事情的处理能力是不一样的。好比,Web前端每秒承受上千万的请求,并非什么神奇的事情,只须要加多一点机器,再搭建一些LVS负载均衡设备和Nginx等便可。但数据库的处理能力却十分有限,即便使用SSD加分库分表,单机的处理能力仍然在万级。因为成本的考虑,咱们不能奢求数据库的机器数量追上前端。这种问题一样存在于系统和系统之间,如短信系统可能因为短板效应,速度卡在网关上(每秒几百次请求),跟前端的并发量不是一个数量级。但用户晚上个半分钟左右收到短信,通常是不会有太大问题的。若是没有消息队列,两个系统之间经过协商、滑动窗口等复杂的方案也不是说不能实现。但系统复杂性指数级增加,势必在上游或者下游作存储,而且要处理定时、拥塞等一系列问题。并且每当有处理能力有差距的时候,都须要单独开发一套逻辑来维护这套逻辑。因此,利用中间系统转储两个系统的通讯内容,并在下游系统有能力处理这些消息的时候,再处理这些消息,是一套相对较通用的方式。
总而言之,消息队列不是万能的。对于须要强事务保证并且延迟敏感的,RPC是优于消息队列的。
对于一些无关痛痒,或者对于别人很是重要可是对于本身不是那么关心的事情,能够利用消息队列去作。
支持最终一致性的消息队列,可以用来处理延迟不那么敏感的“分布式事务”场景,并且相对于笨重的分布式事务,多是更优的处理方式。
当上下游系统处理能力存在差距的时候,利用消息队列作一个通用的“漏斗”。在下游有能力处理的时候,再进行分发。
若是下游有不少系统关心你的系统发出的通知的时候,果断地使用消息队列吧。
咱们如今明确了消息队列的使用场景,下一步就是如何设计实现一个消息队列了。
基于消息的系统模型,不必定须要broker(消息队列服务端)。市面上的的Akka(actor模型)、ZeroMQ等,其实都是基于消息的系统设计范式,可是没有broker。
咱们之因此要设计一个消息队列,而且配备broker,无外乎要作两件事情:
通常来说,设计消息队列的总体思路是先build一个总体的数据流,例如producer发送给broker,broker发送给consumer,consumer回复消费确认,broker删除/备份消息等。
利用RPC将数据流串起来。而后考虑RPC的高可用性,尽可能作到无状态,方便水平扩展。
以后考虑如何承载消息堆积,而后在合适的时机投递消息,而处理堆积的最佳方式,就是存储,存储的选型须要综合考虑性能/可靠性和开发维护成本等诸多因素。
为了实现广播功能,咱们必需要维护消费关系,能够利用zk/config server等保存消费关系。
在完成了上述几个功能后,消息队列基本就实现了。而后咱们能够考虑一些高级特性,如可靠投递,事务特性,性能优化等。
下面咱们会以设计消息队列时重点考虑的模块为主线,穿插灌输一些消息队列的特性实现方法,来具体分析设计实现一个消息队列时的方方面面。
刚才讲到,所谓消息队列,无外乎两次RPC加一次转储,固然须要消费端最终作消费确认的状况是三次RPC。既然是RPC,就必然牵扯出一系列话题,什么负载均衡啊、服务发现啊、通讯协议啊、序列化协议啊,等等。在这一块,个人强烈建议是不要重复造轮子。利用公司现有的RPC框架:Thrift也好,Dubbo也好,或者是其余自定义的框架也好。由于消息队列的RPC,和普通的RPC没有本质区别。固然了,自主利用Memchached或者Redis协议从新写一套RPC框架并不是不可(如MetaQ使用了本身封装的Gecko NIO框架,卡夫卡也用了相似的协议)。但实现成本和难度无疑倍增。排除对效率的极端要求,均可以使用现成的RPC框架。简单来说,服务端提供两个RPC服务,一个用来接收消息,一个用来确认消息收到。而且作到无论哪一个server收到消息和确认消息,结果一致便可。固然这中间可能还涉及跨IDC的服务的问题。这里和RPC的原则是一致的,尽可能优先选择本机房投递。你可能会问,若是producer和consumer自己就在两个机房了,怎么办?首先,broker必须保证感知的到全部consumer的存在。其次,producer尽可能选择就近的机房就行了。
其实全部的高可用,是依赖于RPC和存储的高可用来作的。先来看RPC的高可用,美团的基于MTThrift的RPC框架,阿里的Dubbo等,其自己就具备服务自动发现,负载均衡等功能。而消息队列的高可用,只要保证broker接受消息和确认消息的接口是幂等的,而且consumer的几台机器处理消息是幂等的,这样就把消息队列的可用性,转交给RPC框架来处理了。
那么怎么保证幂等呢?最简单的方式莫过于共享存储。broker多机器共享一个DB或者一个分布式文件/kv系统,则处理消息天然是幂等的。就算有单点故障,其余节点能够马上顶上。另外failover能够依赖定时任务的补偿,这是消息队列自己自然就能够支持的功能。存储系统自己的可用性咱们不须要操太多心,放心大胆的交给DBA们吧!
对于不共享存储的队列,如Kafka使用分区加主备模式,就略微麻烦一些。须要保证每个分区内的高可用性,也就是每个分区至少要有一个主备且须要作数据的同步,关于这块HA的细节,能够参考下篇pull模型消息系统设计。
消息到达服务端若是不通过任何处理就到接收者了,broker就失去了它的意义。为了知足咱们错峰/流控/最终可达等一系列需求,把消息存储下来,而后选择时机投递就显得是瓜熟蒂落的了。
只是这个存储能够作成不少方式。好比存储在内存里,存储在分布式KV里,存储在磁盘里,存储在数据库里等等。但归结起来,主要有持久化和非持久化两种。
持久化的形式能更大程度地保证消息的可靠性(如断电等不可抗外力),而且理论上能承载更大限度的消息堆积(外存的空间远大于内存)。
但并非每种消息都须要持久化存储。不少消息对于投递性能的要求大于可靠性的要求,且数量极大(如日志)。这时候,消息不落地直接暂存内存,尝试几回failover,最终投递出去也何尝不可。
市面上的消息队列广泛两种形式都支持。固然具体的场景还要具体结合公司的业务来看。
咱们来看看若是须要数据落地的状况下各类存储子系统的选择。理论上,从速度来看,文件系统>分布式KV(持久化)>分布式文件系统>数据库,而可靠性却截然相反。仍是要从支持的业务场景出发做出最合理的选择,若是大家的消息队列是用来支持支付/交易等对可靠性要求很是高,但对性能和量的要求没有这么高,并且没有时间精力专门作文件存储系统的研究,DB是最好的选择。
可是DB受制于IOPS,若是要求单broker 5位数以上的QPS性能,基于文件的存储是比较好的解决方案。总体上能够采用数据文件+索引文件的方式处理,具体这块的设计比较复杂,能够参考下篇的存储子系统设计。
分布式KV(如MongoDB,HBase)等,或者持久化的Redis,因为其编程接口较友好,性能也比较可观,若是在可靠性要求不是那么高的场景,也不失为一个不错的选择。
如今咱们的消息队列初步具有了转储消息的能力。下面一个重要的事情就是解析发送接收关系,进行正确的消息投递了。
市面上的消息队列定义了一堆让人晕头转向的名词,如JMS 规范中的Topic/Queue,Kafka里面的Topic/Partition/ConsumerGroup,RabbitMQ里面的Exchange等等。抛开现象看本质,无外乎是单播与广播的区别。所谓单播,就是点到点;而广播,是一点对多点。固然,对于互联网的大部分应用来讲,组间广播、组内单播是最多见的情形。
消息须要通知到多个业务集群,而一个业务集群内有不少台机器,只要一台机器消费这个消息就能够了。
固然这不是绝对的,不少时候组内的广播也是有适用场景的,如本地缓存的更新等等。另外,消费关系除了组内组间,可能会有多级树状关系。这种状况太过于复杂,通常不列入考虑范围。因此,通常比较通用的设计是支持组间广播,不一样的组注册不一样的订阅。组内的不一样机器,若是注册一个相同的ID,则单播;若是注册不一样的ID(如IP地址+端口),则广播。
至于广播关系的维护,通常因为消息队列自己都是集群,因此都维护在公共存储上,如config server、zookeeper等。维护广播关系所要作的事情基本是一致的:
上面都是些消息队列基本功能的实现,下面来看一些关于消息队列特性相关的内容,无论可靠投递/消息丢失与重复以及事务乃至于性能,不是每一个消息队列都会照顾到,因此要依照业务的需求,来仔细衡量各类特性实现的成本,利弊,最终作出最为合理的设计。
这是个激动人心的话题,彻底不丢消息,究竟可不可能?答案是,彻底可能,前提是消息可能会重复,而且,在异常状况下,要接受消息的延迟。
方案说简单也简单,就是每当要发生不可靠的事情(RPC等)以前,先将消息落地,而后发送。当失败或者不知道成功失败(好比超时)时,消息状态是待发送,定时任务不停轮询全部待发送消息,最终必定能够送达。
具体来讲:
对于各类不肯定(超时、down机、消息没有送达、送达后数据没落地、数据落地了回复没收到),其实对于发送方来讲,都是一件事情,就是消息没有送达。
重推消息所面临的问题就是消息重复。重复和丢失就像两个噩梦,你必需要面对一个。好在消息重复还有处理的机会,消息丢失再想找回就难了。
Anyway,做为一个成熟的消息队列,应该尽可能在各个环节减小重复投递的可能性,不能由于重复有解决方案就放纵的乱投递。
最后说一句,不是全部的系统都要求最终一致性或者可靠投递,好比一个论坛系统、一个招聘系统。一个重复的简历或话题被发布,可能比丢失了一个发布显得更让用户没法接受。不断重复一句话,任何基础组件要服务于业务场景。
消费确认
当broker把消息投递给消费者后,消费者能够当即响应我收到了这个消息。但收到了这个消息只是第一步,我能不能处理这个消息却不必定。或许由于消费能力的问题,系统的负荷已经不能处理这个消息;或者是刚才状态机里面提到的消息不是我想要接收的消息,主动要求重发。
把消息的送达和消息的处理分开,这样才真正的实现了消息队列的本质-解耦。因此,容许消费者主动进行消费确认是必要的。固然,对于没有特殊逻辑的消息,默认Auto Ack也是能够的,但必定要容许消费方主动ack。
对于正确消费ack的,没什么特殊的。可是对于reject和error,须要特别说明。reject这件事情,每每业务方是没法感知到的,系统的流量和健康情况的评估,以及处理能力的评估是一件很是复杂的事情。举个极端的例子,收到一个消息开始build索引,可能这个消息要处理半个小时,但消息量倒是很是的小。因此reject这块建议作成滑动窗口/线程池相似的模型来控制,
消费能力不匹配的时候,直接拒绝,过一段时间重发,减小业务的负担。
但业务出错这件事情是只有业务方本身知道的,就像上文提到的状态机等等。这时应该容许业务方主动ack error,并能够与broker约定下次投递的时间。
重复消息和顺序消息
上文谈到重复消息是不可能100%避免的,除非能够容许丢失,那么,顺序消息可否100%知足呢? 答案是能够,但条件更为苛刻:
因此绝对的顺序消息基本上是不能实现的,固然在METAQ/Kafka等pull模型的消息队列中,单线程生产/消费,排除消息丢失,也是一种顺序消息的解决方案。
通常来说,一个主流消息队列的设计范式里,应该是不丢消息的前提下,尽可能减小重复消息,不保证消息的投递顺序。
谈到重复消息,主要是两个话题:
先来看看第一个话题,每个消息应该有它的惟一身份。无论是业务方自定义的,仍是根据IP/PID/时间戳生成的MessageId,若是有地方记录这个MessageId,消息到来是可以进行比对就能完成重复的鉴定。数据库的惟一键/bloom filter/分布式KV中的key,都是不错的选择。因为消息不能被永久存储,因此理论上都存在消息从持久化存储移除的瞬间上游还在投递的可能(上游因种种缘由投递失败,不停重试,都到了下游清理消息的时间)。这种事情都是异常状况下才会发生的,毕竟是小众状况。两分钟消息都还没送达,多送一次又能怎样呢?幂等的处理消息是一门艺术,由于种种缘由重复消息或者错乱的消息仍是来到了,说两种通用的解决方案:
版本号
举个简单的例子,一个产品的状态有上线/下线状态。若是消息1是下线,消息2是上线。不巧消息1判重失败,被投递了两次,且第二次发生在2以后,若是不作重复性判断,显然最终状态是错误的。
可是,若是每一个消息自带一个版本号。上游发送的时候,标记消息1版本号是1,消息2版本号是2。若是再发送下线消息,则版本号标记为3。下游对于每次消息的处理,同时维护一个版本号。
每次只接受比当前版本号大的消息。初始版本为0,当消息1到达时,将版本号更新为1。消息2到来时,由于版本号>1.能够接收,同时更新版本号为2.当另外一条下线消息到来时,若是版本号是3.则是真实的下线消息。若是是1,则是重复投递的消息。
若是业务方只关心消息重复不重复,那么问题就已经解决了。但不少时候另外一个头疼的问题来了,就是消息顺序若是和想象的顺序不一致。好比应该的顺序是12,到来的顺序是21。则最后会发生状态错误。
参考TCP/IP协议,若是想让乱序的消息最后可以正确的被组织,那么就应该只接收比当前版本号大一的消息。而且在一个session周期内要一直保存各个消息的版本号。
若是到来的顺序是21,则先把2存起来,待2到来后,再处理1,这样重复性和顺序性要求就都达到了。
状态机
基于版本号来处理重复和顺序消息听起来是个不错的主意,但凡事总有瑕疵。使用版本号的最大问题是:
还不能只存储最新的版本号的消息,要把乱序到来的消息都存储起来。并且必需要对此作出处理。试想一个永不过时的"session",好比一个物品的状态,会不停流转于上下线。那么中间环节的全部存储
就必须保留,直到在某个版本号以前的版本一个不丢的到来,成本过高。
就刚才的场景看,若是消息没有版本号,该怎么解决呢?业务方只须要本身维护一个状态机,定义各类状态的流转关系。例如,"下线"状态只容许接收"上线"消息,“上线”状态只能接收“下线消息”,若是上线收到上线消息,或者下线收到下线消息,在消息不丢失和上游业务正确的前提下。要么是消息发重了,要么是顺序到达反了。这时消费者只须要把“我不能处理这个消息”告诉投递者,要求投递者过一段时间重发便可。并且重发必定要有次数限制,好比5次,避免死循环,就解决了。
举例子说明,假设产品自己状态是下线,1是上线消息,2是下线消息,3是上线消息,正常状况下,消息应该的到来顺序是123,但实际状况下收到的消息状态变成了3123。
那么下游收到3消息的时候,判断状态机流转是下线->上线,能够接收消息。而后收到消息1,发现是上线->上线,拒绝接收,要求重发。而后收到消息2,状态是上线->下线,因而接收这个消息。
此时不管重发的消息1或者3到来,仍是能够接收。另外的重发,在必定次数拒绝后中止重发,业务正确。
中间件对于重复消息的处理
回归到消息队列的话题来说。上述通用的版本号/状态机/ID判重解决方案里,哪些是消息队列该作的、哪些是消息队列不应作业务方处理的呢?其实这里没有一个彻底严格的定义,但回到咱们的出发点,咱们保证不丢失消息的状况下尽可能少重复消息,消费顺序不保证。那么重复消息下和乱序消息下业务的正确,应该是由消费方保证的,咱们要作的是减小消息发送的重复。
咱们没法定义业务方的业务版本号/状态机,若是API里强制须要指定版本号,则显得过于绑架客户了。何况,在消费方维护这么多状态,就涉及到一个消费方的消息落地/多机间的同步消费状态问题,复杂度指数级上升,并且只能解决部分问题。
减小重复消息的关键步骤:
持久性是事务的一个特性,然而只知足持久性却不必定能知足事务的特性。仍是拿扣钱/加钱的例子讲。知足事务的一致性特征,则必需要么都不进行,要么都能成功。解决方案从大方向上有两种:
分布式事务存在的最大问题是成本过高,两阶段提交协议,对于仲裁down机或者单点故障,几乎是一个无解的黑洞。对于交易密集型或者I/O密集型的应用,没有办法承受这么高的网络延迟,系统复杂性。
而且成熟的分布式事务必定构建与比较靠谱的商用DB和商用中间件上,成本也过高。
那如何使用本地事务解决分布式事务的问题呢?以本地和业务在一个数据库实例中建表为例子,与扣钱的业务操做同一个事务里,将消息插入本地数据库。若是消息入库失败,则业务回滚;若是消息入库成功,事务提交。
而后发送消息(注意这里能够实时发送,不须要等定时任务检出,以提升消息实时性)。之后的问题就是前文的最终一致性问题所提到的了,只要消息没有发送成功,就一直靠定时任务重试。
这里有一个关键的点,本地事务作的,是业务落地和消息落地的事务,而不是业务落地和RPC成功的事务。这里不少人容易混淆,若是是后者,无疑是事务嵌套RPC,是大忌,会有长事务死锁等各类风险。
而消息只要成功落地,很大程度上就没有丢失的风险(磁盘物理损坏除外)。而消息只要投递到服务端确认后本地才作删除,就完成了producer->broker的可靠投递,而且当消息存储异常时,业务也是能够回滚的。
本地事务存在两个最大的使用障碍:
话说回来,不是每一个业务都须要强事务的。扣钱和加钱须要事务保证,但下单和生成短信却不须要事务,不能由于要求发短信的消息存储投递失败而要求下单业务回滚。因此,一个完整的消息队列应该定义清楚本身能够投递的消息类型,如事务型消息,本地非持久型消息,以及服务端不落地的非可靠消息等。对不一样的业务场景作不一样的选择。另外事务的使用应该尽可能低成本、透明化,能够依托于现有的成熟框架,如Spring的声明式事务作扩展。业务方只须要使用@Transactional标签便可。
异步/同步
首先澄清一个概念,异步,同步和oneway是三件事。异步,归根结底你仍是须要关心结果的,但可能不是当时的时间点关心,能够用轮询或者回调等方式处理结果;同步是须要当时关心
的结果的;而oneway是发出去就无论死活的方式,这种对于某些彻底对可靠性没有要求的场景仍是适用的,但不是咱们重点讨论的范畴。
回归来看,任何的RPC都是存在客户端异步与服务端异步的,并且是能够任意组合的:客户端同步对服务端异步,客户端异步对服务端异步,客户端同步对服务端同步,客户端异步对服务端同步。
对于客户端来讲,同步与异步主要是拿到一个Result,仍是Future
(Listenable)的区别。实现方式能够是线程池,NIO或者其余事件机制,这里先不展开讲。
服务端异步可能稍微难理解一点,这个是须要RPC协议支持的。参考servlet 3.0规范,服务端能够吐一个future给客户端,而且在future done的时候通知客户端。
整个过程能够参考下面的代码:
客户端同步服务端异步。
Future<Result> future = request(server);//server马上返回future synchronized(future){ while(!future.isDone()){ future.wait();//server处理结束后会notify这个future,并修改isdone标志 } } return future.get();
客户端同步服务端同步。
Result result = request(server);
客户端异步服务端同步(这里用线程池的方式)。
Future<Result> future = executor.submit(new Callable(){public void call<Result>(){ result = request(server); }}) return future;
客户端异步服务端异步。
Future<Result> future = request(server);//server马上返回future return future
上面说了这么多,实际上是想让你们脱离两个误区:
那么,服务端使用异步最大的好处是什么呢?说到底,是解放了线程和I/O。试想服务端有一堆I/O等待处理,若是每一个请求都须要同步响应,每条消息都须要结果马上返回,那么就几乎无法作I/O合并
(固然接口能够设计成batch的,但可能batch发过来的仍然数量较少)。而若是用异步的方式返回给客户端future,就能够有机会进行I/O的合并,把几个批次发过来的消息一块儿落地(这种合并对于MySQL等容许batch insert的数据库效果尤为明显),而且完全释放了线程。不至于说来多少请求开多少线程,可以支持的并发量直线提升。
来看第二个误区,返回future的方式不必定只有线程池。换句话说,能够在线程池里面进行同步操做,也能够进行异步操做,也能够不使用线程池使用异步操做(NIO、事件)。
回到消息队列的议题上,咱们固然不但愿消息的发送阻塞主流程(前面提到了,server端若是使用异步模型,则可能因消息合并带来必定程度上的消息延迟),因此能够先使用线程池提交一个发送请求,主流程继续往下走。
可是线程池中的请求关心结果吗?Of course,必须等待服务端消息成功落地,才算是消息发送成功。因此这里的模型,准确地说事客户端半同步半异步(使用线程池不阻塞主流程,但线程池中的任务须要等待server端的返回),server端是纯异步。客户端的线程池wait在server端吐回的future上,直到server端处理完毕,才解除阻塞继续进行。
总结一句,同步可以保证结果,异步可以保证效率,要合理的结合才能作到最好的效率。
批量
谈到批量就不得不提生产者消费者模型。但生产者消费者模型中最大的痛点是:消费者到底应该什么时候进行消费。大处着眼来看,消费动做都是事件驱动的。主要事件包括:
对于及时性要求高的数据,可用采用方式3来完成,好比客户端向服务端投递数据。只要队列有数据,就把队列中的全部数据刷出,不然将本身挂起,等待新数据的到来。在第一次把队列数据往外刷的过程当中,又积攒了一部分数据,第二次又能够造成一个批量。伪代码以下:
Executor executor = Executors.newFixedThreadPool(4); final BlockingQueue<Message> queue = new ArrayBlockingQueue<>(); private Runnable task = new Runnable({//这里因为共享队列,Runnable能够复用,故作成全局的 public void run(){ List<Message> messages = new ArrayList<>(20); queue.drainTo(messages,20); doSend(messages);//阻塞,在这个过程当中会有新的消息到来,若是4个线程都占满,队列就有机会囤新的消息 } }); public void send(Message message){ queue.offer(message); executor.submit(task) }
这种方式是消息延迟和批量的一个比较好的平衡,但优先响应低延迟。延迟的最高程度由上一次发送的等待时间决定。但可能形成的问题是发送过快的话批量的大小不够知足性能的极致。
Executor executor = Executors.newFixedThreadPool(4); final BlockingQueue<Message> queue = new ArrayBlockingQueue<>(); volatile long last = System.currentMills(); Executors.newSingleThreadScheduledExecutor().submit(new Runnable(){ flush(); },500,500,TimeUnits.MILLS); private Runnable task = new Runnable({//这里因为共享队列,Runnable能够复用,顾作成全局的。 public void run(){ List<Message> messages = new ArrayList<>(20); queue.drainTo(messages,20); doSend(messages);//阻塞,在这个过程当中会有新的消息到来,若是4个线程都占满,队列就有机会屯新的消息。 } }); public void send(Message message){ last = System.currentMills(); queue.offer(message); flush(); } private void flush(){ if(queue.size>200||System.currentMills()-last>200){ executor.submit(task) } }
相反对于能够用适量的延迟来换取高性能的场景来讲,用定时/定量二选一的方式可能会更为理想,既到达必定数量才发送,但若是数量一直达不到,也不能干等,有一个时间上限。具体说来,在上文的submit以前,多判断一个时间和数量,而且Runnable内部维护一个定时器,避免没有新任务到来时旧的任务永远没有机会触发发送条件。对于server端的数据落地,使用这种方式就很是方便。
最后啰嗦几句,曾经有人问我,为何网络请求小包合并成大包会提升性能?主要缘由有两个:
上文提到的消息队列,大可能是针对push模型的设计。如今市面上有不少经典的也比较成熟的pull模型的消息队列,如Kafka、MetaQ等。这跟JMS中传统的push方式有很大的区别,可谓另辟蹊径。咱们简要分析下push和pull模型各自存在的利弊。
慢消费无疑是push模型最大的致命伤,穿成流水线来看,若是消费者的速度比发送者的速度慢不少,势必形成消息在broker的堆积。假设这些消息都是有用的没法丢弃的,消息就要一直在broker端保存。固然这还不是最致命的,最致命的是broker给consumer推送一堆consumer没法处理的消息,consumer不是reject就是error,而后来回踢皮球。反观pull模式,consumer能够按需消费,不用担忧本身处理不了的消息来骚扰本身,而broker堆积消息也会相对简单,无需记录每个要发送消息的状态,只须要维护全部消息的队列和偏移量就能够了。因此对于创建索引等慢消费,消息量有限且到来的速度不均匀的状况,pull模式比较合适。
这是pull模式最大的短板。因为主动权在消费方,消费方没法准确地决定什么时候去拉取最新的消息。若是一次pull取到消息了还能够继续去pull,若是没有pull取到则须要等待一段时间从新pull。
但等待多久就很难断定了。你可能会说,我能够有xx动态pull取时间调整算法,但问题的本质在于,有没有消息到来这件事情决定权不在消费方。也许1分钟内连续来了1000条消息,而后半个小时没有新消息产生,
可能你的算法算出下次最有可能到来的时间点是31分钟以后,或者60分钟以后,结果下条消息10分钟后到了,是否是很让人沮丧?
固然也不是说延迟就没有解决方案了,业界较成熟的作法是从短期开始(不会对broker有太大负担),而后指数级增加等待。好比开始等5ms,而后10ms,而后20ms,而后40ms……直到有消息到来,而后再回到5ms。
即便这样,依然存在延迟问题:假设40ms到80ms之间的50ms消息到来,消息就延迟了30ms,并且对于半个小时来一次的消息,这些开销就是白白浪费的。
在阿里的RocketMq里,有一种优化的作法-长轮询,来平衡推拉模型各自的缺点。基本思路是:消费者若是尝试拉取失败,不是直接return,而是把链接挂在那里wait,服务端若是有新的消息到来,把链接notify起来,这也是不错的思路。但海量的长链接block对系统的开销仍是不容小觑的,仍是要合理的评估时间间隔,给wait加一个时间上限比较好~
若是push模式的消息队列,支持分区,单分区只支持一个消费者消费,而且消费者只有确认一个消息消费后才能push送另一个消息,还要发送者保证全局顺序惟一,听起来也能作顺序消息,但成本过高了,尤为是必须每一个消息消费确认后才能发下一条消息,这对于自己堆积能力和慢消费就是瓶颈的push模式的消息队列,简直是一场灾难。反观pull模式,若是想作到全局顺序消息,就相对容易不少:
因此对于日志push送这种最好全局有序,但容许出现小偏差的场景,pull模式很是合适。若是你不想看到通篇乱套的日志~~Anyway,须要顺序消息的场景仍是比较有限的并且成本过高,请慎重考虑。
本文从为什么使用消息队列开始讲起,而后主要介绍了如何从零开始设计一个消息队列,包括RPC、事务、最终一致性、广播、消息确认等关键问题。并对消息队列的push、pull模型作了简要分析,最后从批量和异步角度,分析了消息队列性能优化的思路。下篇会着重介绍一些高级话题,如存储系统的设计、流控和错峰的设计、公平调度等。但愿经过这些,让你们对消息队列有个提纲挈领的总体认识,并给自主开发消息队列提供思路。另外,本文主要是源自本身在开发消息队列中的思考和读源码时的体会,比较不"官方",也不免会存在一些漏洞,欢迎你们多多交流。
后续咱们还会推出消息队列设计高级篇,内容会涵盖如下方面:
敬请期待哦~
王烨,如今是美团旅游后台研发组的程序猿,以前曾经在百度、去哪和优酷工做过,专一Java后台开发。对于网络编程和并发编程具备浓厚的兴趣,曾经作过一些基础组件,也翻过一些源码,属于比较典型的宅男技术控。期待可以与更多知己,在coding的路上并肩前行~