文章摘要:MQ分布式消息队列大体流程在于消息的一发一收一存,本篇将为你们主要介绍下RocketMQ存储部分的架构
消息存储是MQ消息队列中最为复杂和最为重要的一部分,因此小编也就放在RocketMQ系列篇幅中最后一部分来进行阐述和介绍。本文先从目前几种比较经常使用的MQ消息队列存储方式出发,为你们介绍RocketMQ选择磁盘文件存储的缘由。而后,本文分别从RocketMQ的消息存储总体架构和RocketMQ文件存储模型层次结构两方面进行深刻分析介绍。使得你们读完本文后对RocketMQ消息存储部分有一个大体的了解和认识。
这里先回顾往期RocketMQ技术分享的篇幅(若是有童鞋没有读过以前的文章,建议先好好读下以前小编写的篇幅或者其余网上相关的博客,把RocketMQ消息发送和消费部分的流程先大体搞明白):
(1)消息中间件—RocketMQ的RPC通讯(一)
(2)消息中间件—RocketMQ的RPC通讯(二)
(3)消息中间件—RocketMQ消息发送
(4)消息中间件—RocketMQ消息消费(一)
(5)消息中间件—RocketMQ消息消费(二)(push模式实现)
(6)消息中间件—RocketMQ消息消费(三)(消息消费重试)linux
当前业界几款主流的MQ消息队列采用的存储方式主要有如下三种方式:
(1)分布式KV存储:这类MQ通常会采用诸如levelDB、RocksDB和Redis来做为消息持久化的方式,因为分布式缓存的读写能力要优于DB,因此在对消息的读写能力要求都不是比较高的状况下,采用这种方式倒也不失为一种能够替代的设计方案。消息存储于分布式KV须要解决的问题在于如何保证MQ总体的可靠性?
(2)文件系统:目前业界较为经常使用的几款产品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来作持久化(刷盘通常能够分为异步刷盘和同步刷盘两种模式)。小编认为,消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。除非部署MQ机器自己或是本地磁盘挂了,不然通常是不会出现没法持久化的故障问题。
(3)关系型数据库DB:Apache下开源的另一款MQ—ActiveMQ(默认采用的KahaDB作消息存储)可选用JDBC的方式来作消息持久化,经过简单的xml配置信息便可实现JDBC消息存储。因为,普通关系型数据库(如Mysql)在单表数据量达到千万级别的状况下,其IO读写性能每每会出现瓶颈。所以,若是要选型或者自研一款性能强劲、吞吐量大、消息堆积能力突出的MQ消息队列,那么小编并不推荐采用关系型数据库做为消息持久化的方案。在可靠性方面,该种方案很是依赖DB,若是一旦DB出现故障,则MQ的消息就没法落盘存储会致使线上故障;
所以,综合上所述从存储效率来讲, 文件系统>分布式KV存储>关系型数据库DB,直接操做文件系统确定是最快和最高效的,而关系型数据库TPS通常相比于分布式KV系统会更低一些(简略地说,关系型数据库自己也是一个须要读写文件server,这时MQ做为client与其创建链接并发送待持久化的消息数据,同时又须要依赖DB的事务等,这一系列操做都比较消耗性能),因此若是追求高效的IO读写,那么选择操做文件系统会更加合适一些。可是若是从易于实现和快速集成来看,关系型数据库DB>分布式KV存储>文件系统,可是性能会降低不少。
另外,从消息中间件的自己定义来考虑,应该尽可能减小对于外部第三方中间件的依赖。通常来讲依赖的外部系统越多,也会使得自己的设计越复杂,因此小编我的的理解是采用文件系统做为消息存储的方式,更贴近消息中间件自己的定义。算法
RokcetMQ存储设计架构.jpgsql
上图即为RocketMQ的消息存储总体架构,RocketMQ采用的是混合型的存储结构,即为Broker单个实例下全部的队列共用一个日志数据文件(即为CommitLog)来存储。而Kafka采用的是独立型的存储结构,每一个队列一个文件。这里小编认为,RocketMQ采用混合型存储结构的缺点在于,会存在较多的随机读操做,所以读的效率偏低。同时消费消息须要依赖ConsumeQueue,构建该逻辑消费队列须要必定开销。数据库
从上面的总体架构图中可见,RocketMQ的混合型存储结构针对Producer和Consumer分别采用了数据和索引部分相分离的存储结构,Producer发送消息至Broker端,而后Broker端使用同步或者异步的方式对消息刷盘持久化,保存至CommitLog中。只要消息被刷盘持久化至磁盘文件CommitLog中,那么Producer发送的消息就不会丢失。正由于如此,Consumer也就确定有机会去消费这条消息,至于消费的时间能够稍微滞后一些也没有太大的关系。退一步地讲,即便Consumer端第一次无法拉取到待消费的消息,Broker服务端也可以经过长轮询机制等待必定时间延迟后再次发起拉取消息的请求。
这里,RocketMQ的具体作法是,使用Broker端的后台服务线程—ReputMessageService不停地分发请求并异步构建ConsumeQueue(逻辑消费队列)和IndexFile(索引文件)数据(ps:对于该服务线程在消息消费篇幅也有过介绍,不清楚的童鞋能够跳至消息消费篇幅再理解下)。而后,Consumer便可根据ConsumerQueue来查找待消费的消息了。其中,ConsumeQueue(逻辑消费队列)做为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。而IndexFile(索引文件)则只是为了消息查询提供了一种经过key或时间区间来查询消息的方法(ps:这种经过IndexFile来查找消息的方法不影响发送与消费消息的主流程)。缓存
这里有必要先稍微简单地介绍下page cache的概念。系统的全部文件I/O请求,操做系统都是经过page cache机制实现的。对于操做系统来讲,磁盘文件都是由一系列的数据块顺序组成,数据块的大小由操做系统自己而决定,x86的linux中一个标准页面大小是4KB。
操做系统内核在处理文件I/O请求时,首先到page cache中查找(page cache中的每个数据块都设置了文件以及偏移量地址信息),若是未命中,则启动磁盘I/O,将磁盘文件中的数据块加载到page cache中的一个空闲块,而后再copy到用户缓冲区中。
page cache自己也会对数据文件进行预读取,对于每一个文件的第一个读请求操做,系统在读入所请求页面的同时会读入紧随其后的少数几个页面。所以,想要提升page cache的命中率(尽可能让访问的页在物理内存中),从硬件的角度来讲确定是物理内存越大越好。从操做系统层面来讲,访问page cache时,即便只访问1k的消息,系统也会提早预读取更多的数据,在下次读取消息时, 就极可能能够命中内存。
在RocketMQ中,ConsumeQueue逻辑消费队列存储的数据较少,而且是顺序读取,在page cache机制的预读取做用下,Consume Queue的读性能会比较高近乎内存,即便在有消息堆积状况下也不会影响性能。而对于CommitLog消息存储的日志数据文件来讲,读取消息内容时候会产生较多的随机访问读取,严重影响性能。若是选择合适的系统IO调度算法,好比设置调度算法为“Noop”(此时块存储采用SSD的话),随机读的性能也会有所提高。
另外,RocketMQ主要经过MappedByteBuffer对文件进行读写操做。其中,利用了NIO中的FileChannel模型直接将磁盘上的物理文件直接映射到用户态的内存地址中(这种Mmap的方式减小了传统IO将磁盘文件数据在操做系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销),将对文件的操做转化为直接对内存地址进行操做,从而极大地提升了文件的读写效率(这里须要注意的是,采用MappedByteBuffer这种内存映射的方式有几个限制,其中之一是一次只能映射1.5~2G 的文件至用户态的虚拟内存,这也是为什么RocketMQ默认设置单个CommitLog日志数据文件为1G的缘由了)。服务器
RocketMQ文件存储模型结构.jpg架构
RocketMQ文件存储模型层次结构如上图所示,根据类别和做用从概念模型上大体能够划分为5层,下面将从各个层次分别进行分析和阐述:
(1)RocketMQ业务处理器层:Broker端对消息进行读取和写入的业务逻辑入口,这一层主要包含了业务逻辑相关处理操做(根据解析RemotingCommand中的RequestCode来区分具体的业务操做类型,进而执行不一样的业务处理流程),好比前置的检查和校验步骤、构造MessageExtBrokerInner对象、decode反序列化、构造Response返回对象等;
(2)RocketMQ数据存储组件层;该层主要是RocketMQ的存储核心类—DefaultMessageStore,其为RocketMQ消息数据文件的访问入口,经过该类的“putMessage()”和“getMessage()”方法完成对CommitLog消息存储的日志数据文件进行读写操做(具体的读写访问操做仍是依赖下一层中CommitLog对象模型提供的方法);另外,在该组件初始化时候,还会启动不少存储相关的后台服务线程,包括AllocateMappedFileService(MappedFile预分配服务线程)、ReputMessageService(回放存储消息服务线程)、HAService(Broker主从同步高可用服务线程)、StoreStatsService(消息存储统计服务线程)、IndexService(索引文件服务线程)等;
(3)RocketMQ存储逻辑对象层:该层主要包含了RocketMQ数据文件存储直接相关的三个模型类IndexFile、ConsumerQueue和CommitLog。IndexFile为索引数据文件提供访问服务,ConsumerQueue为逻辑消息队列提供访问服务,CommitLog则为消息存储的日志数据文件提供访问服务。这三个模型类也是构成了RocketMQ存储层的总体结构(对于这三个模型类的深刻分析将放在后续篇幅中);
(4)封装的文件内存映射层:RocketMQ主要采用JDK NIO中的MappedByteBuffer和FileChannel两种方式完成数据文件的读写。其中,采用MappedByteBuffer这种内存映射磁盘文件的方式完成对大文件的读写,在RocketMQ中将该类封装成MappedFile类。这里限制的问题在上面已经讲过;对于每类大文件(IndexFile/ConsumerQueue/CommitLog),在存储时分隔成多个固定大小的文件(单个IndexFile文件大小约为400M、单个ConsumerQueue文件大小约5.72M、单个CommitLog文件大小为1G),其中每一个分隔文件的文件名为前面全部文件的字节大小数+1,即为文件的起始偏移量,从而实现了整个大文件的串联。这里,每一种类的单个文件均由MappedFile类提供读写操做服务(其中,MappedFile类提供了顺序写/随机读、内存数据刷盘、内存清理等和文件相关的服务);
(5)磁盘存储层:主要指的是部署RocketMQ服务器所用的磁盘。这里,须要考虑不一样磁盘类型(如SSD或者普通的HDD)特性以及磁盘的性能参数(如IOPS、吞吐量和访问时延等指标)对顺序写/随机读操做带来的影响(ps:小编建议在正式业务上线以前作好多轮的性能压测,具体用压测的结果来评测);并发
RocketMQ的RocketMQ消息存储(一)篇幅就先分析到这儿了。RocketMQ消息存储部分的内容与其余全部篇幅(RocketMQ的Remoting通讯、普通消息发送和消息消费部分)相比是最为复杂的,须要读者反复多看源码并屡次对消息读和写进行Debug(能够经过在Broker端的SendMessageProcessor/PullMessageProcesssor/QueryMessaageProcessor几个业务处理器入口,在其重要方法中打印相关重要属性值的方式或者一步步地Debug代码,来仔细研究下其中的存储过程),反复几回后才能够对消息存储这部分有一个较为深入的理解,同时也有助于提升对RocketMQ的总体理解。限于笔者的才疏学浅,对本文内容可能还有理解不到位的地方,若有阐述不合理之处还望留言一块儿探讨。app
做者:癫狂侠
连接:https://www.jianshu.com/p/b73fdd893f98
來源:简书
简书著做权归做者全部,任何形式的转载都请联系做者得到受权并注明出处。异步