RocketMQ底层原理和源码解析

本文的内容主要包括:MQ的对比和选型、RocketMQ原理、RocketMQ源码解析、事务消息。java

如发现本文中所写有错误或者不许确之处,很是欢迎指出,我会及时调研修正!c++

ps:转载的同窗请标出原文连接,尊重别人的劳动成果,谢谢?!git

 

Rocketmq性能压测客户端:https://github.com/Force-King/rocketmq-testgithub

 

1、MQ的对比和选型

目前业界主要的消息中间件有:ActiveMQ、RabbitMQ、ZeroMQ、Kafka、MetaMQ、RocketMQ。其中开源的,社区比较活跃的是ActiveMQ、RabbitMQ、Kafka、RocketMQ。因此,咱们作技术选型时,须要考虑从以上4种开源的MQ中选取。如下是这4种mq的特性:数据库

RabbitMQ、Kafka、RocketMQ性能测试对比:apache

在同步发送场景中,三个消息中间件的表现区分明显:缓存

Kafka的吞吐量高达17.3w/s,不愧是高吞吐量消息中间件的行业老大。这主要取决于它的队列模式保证了写磁盘的过程是线性IO。此时broker磁盘IO已达瓶颈。服务器

RocketMQ也表现不俗,吞吐量在11.6w/s,磁盘IO %util已接近100%。RocketMQ的消息写入内存后即返回ack,由单独的线程专门作刷盘的操做,全部的消息均是顺序写文件。网络

RabbitMQ的吞吐量5.95w/s,CPU资源消耗较高。它支持AMQP协议,实现很是重量级,为了保证消息的可靠性在吞吐量上作了取舍。RabbitMQ在消息持久化场景下,吞吐量在2.6w/s左右。多线程

 

Redis(补充)

是一个Key-Value的NoSQL数据库,开发维护很活跃,同时它也支持MQ功能,因此彻底能够当作一个轻量级的队列服务来使用。对于RabbitMQ和Redis的入队和出队操做,各执行100万次,每10万次记录一次执行时间。测试数据分为128Bytes、512Bytes、1K和10K四个不一样大小的数据。实验代表:入队时,当数据比较小时Redis的性能要高于RabbitMQ,而若是数据大小超过了10K,Redis则慢的没法忍受;出队时,不管数据大小,Redis都表现出很是好的性能,而RabbitMQ的出队性能则远低于Redis。

 

综合以上对比,能够得出,RocketMQ 和 Kafka 的吞吐量最高,在高吞吐量的业务要求下,首选这两个mq。

Kafka 在topic 达到必定量以上时,吞吐量急剧降低;而RocketMQ影响较小。以下图:

 

从以上几个图能够看出,随着业务增多,topic 增多,Kafka的稳定性比RocketMQ要差很多。

因此综合来看,做为互联网行业,确定会涉及资金转帐等要求事务一致的场景, Kafka 不支持分布式事务,RocketMQ 支持分布式事务,并且理论上不会丢消息,总体对比来看,仍是RocketMQ更加优越。并且RocketMQ是经历了屡次双十一考验的,可用性是毋庸置疑的!

 

2、RocketMQ原理

1. RocketMQ的发展历史

        在2007年的时候,淘宝实施了“五彩石”项目,“五彩石”用于将交易系统从单机变成分布式,也是在这个过程当中产生了阿里巴巴第一代消息引擎——Notify。在2010年的时候,阿里巴巴B2B部门基于ActiveMQ的5.1版本也开发了本身的一款消息引擎,称为Napoli,这款消息引擎在B2B里面普遍地被使用,不只仅是在交易领域,在不少的后台异步解耦等方面也获得了普遍的应用。在2011年的时候,业界出现了如今被不少大数据领域所推崇的Kafka消息引擎,阿里在研究了Kafka的总体机制和架构设计以后,基于Kafka的设计使用Java进行了彻底重写并推出了MetaQ 1.0版本,主要是用于解决顺序消息和海量堆积的问题。而在2012年,阿里对于MetaQ进行了架构重组升级,开发出了MetaQ 2.0,这时就发现MetaQ本来基于Kafka的架构在阿里巴巴如此庞大的体系下很难进行水平扩展,因此在2012年的时候就开发了RocketMQ 3.0版本。不少人会问到RocketMQ 3.0和MetaQ 3.0的区别,其实这二者是等价的版本,只不过阿里内部使用的称为MetaQ 3.0,外部开源称之为RocketMQ 3.0。在2015年,又基于RocketMQ开发了阿里云上的Aliware MQ和Notify 3.0。在2016年的时候,阿里巴巴将RocketMQ的内核引擎捐赠给了Apache基金会。

        以上就是RocketMQ的总体发展历史,其实在阿里巴巴内部围绕着RocketMQ内核打造了三款产品,分别是MetaQ、Notify和Aliware MQ。这三者分别采用了不一样的模型,MetaQ主要使用了拉模型,解决了顺序消息和海量堆积问题;Notify主要使用了推模型,解决了事务消息;而云产品Aliware MQ则是提供了商业化的版本。如图:

下图是RocketMQ在历年双11的消息流转体量:

单位:十亿

 

2. RocketMQ的原理和架构

RocketMQ消息队列的总体部署架构以下图所示:

RocketMQ消息队列集群中的几个角色:

  • NameServer:命名发现服务,更新和路由发现broker;其在RocketMQ中起着中转承接的做用,是一个无状态的服务,多个NameServer之间不通讯。
  • Broker-Master:broker 消息主机服务器。
  • Broker-Slave:broker 消息从机服务器。
  • Producer:消息生产者。
  • Consumer:消息消费者。

RocketMQ集群的一部分通讯以下:

  • Broker启动后须要完成一次将本身注册至NameServer的操做;随后每隔30s时间按期向NameServer上报Topic路由信息;
  • 消息生产者Producer做为客户端发送消息时候,须要根据Msg的Topic从本地缓存的TopicPublishInfoTable获取路由信息。若是没有则更新路由信息会从 NameServer上从新拉取;
  • 消息生产者Producer根据所获取的路由信息选择一个队列(MessageQueue)进行消息发送;Broker做为消息的接收者接收消息并落盘存储。

从上面能够看出在消息生产者,在Broker和NameServer间都会发生通讯(这里只说了MQ的部分通讯),所以如何设计一个良好的网络通讯模块在MQ中相当重要,它将决定RocketMQ集群总体的消息传输能力与最终性能。

rocketmq-remoting 模块是 RocketMQ消息队列中负责网络通讯的模块,它几乎被其余全部须要网络通讯的模块(诸如rocketmq-client、rocketmq-broker、rocketmq-namesrv)所依赖和引用。

为了实现客户端与服务器之间高效的数据请求与接收,RocketMQ消息队列自定义了通讯协议并在Netty的基础之上扩展了通讯模块。

 

鉴于RocketMQ通讯模块的底层源码是Netty实现的,在阅读RocketMQ的源码以前,建议读者先对Netty的多线程模型、JAVA NIO模型均有必定的了解,以便快速理解RocketMQ源码。

 

3、RocketMQ源码解析

1. 源码目录结构介绍

本文使用的RocketMQ版本是4.4.0, 依赖的netty版本是4.0.42.Final.

RocketMQ的代码结构图以下:

RocketMQ源码主要分为如下几个package:

rocketmq-broker:mq的核心,它能接收producer和consumer的请求,并调用store层服务对消息进行处理。HA服务的基本单元,支持同步双写,异步双写等模式。

rocketmq-client:mq客户端实现,目前官方仅仅开源了java版本的mq客户端,c++,go客户端有社区开源贡献。

rocketmq-common:一些模块间通用的功能类,好比一些配置文件、常量。

rocketmq-example:官方提供的例子,对典型的功能好比order message,push consumer,pull consumer的用法进行了示范。

rocketmq-filter:消息过滤服务,至关于在broker和consumer中间加入了一个filter代理。

rocketmq-namesrv:命名服务,更新和路由发现 broker服务。

rocketmq-remoting:基于netty的底层通讯实现,全部服务间的交互都基于此模块。

rocketmq-srvutil:解析命令行的工具类ServerUtil。

rocketmq-store:存储层实现,同时包括了索引服务,高可用HA服务实现。

rocketmq-tools:mq集群管理工具,提供了消息查询等功能。

 

RocketMQ 的逻辑部署结构:

RocketMQ主要的功能集中在rocketmq-broker、rocketmq-remoting、rocketmq-store 三个模块中,因此咱们接下来主要分析这三个模块的源码。

先主要讲解rocketmq 启动时,都作了什么。

 

2. RocketMQ 服务启动

启动类:org.apache.rocketmq.broker.BrokerController

这个类是服务启动时执行,初始化了发送消息、消费消息、清理过时请求等各类线程池和监听事件。

了解了mq服务启动的过程,接下来,咱们按照一条消息从客户端发出,最终到服务端的存储层并如何落盘,这一条调用链来分析源码,了解一条消息是怎么处理的。

 

3. Client

一般为了避免影响业务层性能,都是使用异步发送,因此接下来,咱们分析消息异步发送的接口代码。

消息由客户端MQProducer发出

 

 

sendKernelImpl方法调用了MQClientAPIImpl类的sendMessage()。以下图:

 

sendMessage() 经过判断发送类型,最终调用了MQClientAPIImpl类的sendMessageAsync()。以下图:

 

可看出,该方法调用了通讯层RemotingClient接口(实现类是NettyRemotingClient)的invokeAsync方法。

 

4. Remoting通讯层

Remoting模块类结构图:

消息从Client调入通讯层的NettyRemotingClient:

 

NettyRemotingAbstract类的invokeAsyncImpl():

 

写入通道后,就等待Netty的Selector轮询出来,调用后续broker处理任务。

 

5. Broke

发送的消息到达broker,调用 org.apache.rocketmq.broker.processor.SendMessageProcessor 类的processRequest()方法,processRequest()调用sendMessag(),以下图:

下图是sendMessag()方法:

 

该方法调用了MessageStore接口的putMessage()方法,而MessageStore的实现类是DefaultMessageStore,该类是位于存储层store,在后续的存储层会讲解。

 

 

6. Store存储层

存储层时rocketmq的重中之重,也是决定mq性能的主要之处,这里咱们将详细分析。

(1) 存储层总体结构和源码

业务层都是经过DefaultMessageStore类作为操做入口。

DefaultMessageStore.putMessage()方法:

 

能够看出,前面的都是一些验证,后边最终调用了CommitLog的putMessage()方法。

CommitLog.putMessage():

RocketMQ下主要有6类文件,分别是三类大文件:Index文件,consumequeue文件,commitlog文件。三类小文件:checkpoint文件,config目录下的配置文件.和abort。

而对于三类大文件,使用的就是NIO的MappedByteBuffer类来提升读写性能。这个类是文件内存映射的相关类,支持随机读和顺序写。在RocketMQ中,被封装成了MappedFile类。

RocketMQ对于每类大文件,在存储时候分割成了多个固定大小的文件,每一个文件名为前面全部的文件大小加1(也就是偏移量)。从而实现对整个大文件的串联拼接。接下来就须要看看MappedFIle这个封装类了。

 

MappedFile.appendMessagesInner():

 

这里能够看到,调用了AppendMessageCallback.doAppend()方法,而AppendMessageCallback是个接口,它的实现类DefaultAppendMessageCallback就在CommitLog类中,是个内部类。

DefaultAppendMessageCallback..doAppend():

方法前面是内存写入位置和偏移量的计算,以及一些消息数据的封装准备。后边最终调用byteBuffer.put()方法写入内存,并计算了耗时,将消息写入的内存的位置信息、写入耗时封装为AppendMessageResult对象返回。

到这里,消息写入内存就算完成了,以后就是经过判断配置文件的主从同步类型和刷盘类型,进行刷盘和HA主从同步。

(2) 刷盘

刷盘类型:

刷盘方式有三种:

其中CommitRealTimeService是老一些版本中没有的,它为开启内存字节缓存的刷盘服务。

CommitRealTimeService和FlushRealTimeService都继承了FlushCommitLogService。

这三个类都在CommitLog类中,是内部类。

FlushRealTimeService

使用配置参数:

flushCommitLogTimed=false // 刷新策略(默认是实时刷盘)

flushIntervalCommitLog=500 // 刷盘时间间隔(默认500ms)

flushCommitLogLeastPages=4 // 每次刷盘至少须要多少个page(默认是4个)

flushCommitLogThoroughInterval = 1000 * 10 // 完全刷盘间隔时间(默认10s)

FlushRealTimeService的run方法:

大体逻辑:

  • 若是 当前时间 >(最后一次刷盘时间 + 完全刷盘间隔时间(10s)),则将最新一次刷盘时间更新为当前时间
  • 若是是实时刷盘,每隔必定时间间隔,该线程休眠500毫秒

若是不是实时刷盘,则调用waitForRunning,即每隔500毫秒或该刷盘服务线程调用了wakeup()方法以后结束阻塞。最后调用 CommitLog.this.mappedFileQueue.flush(flushPhysicQueueLeastPages)进行刷盘。

继续深刻MappedFileQueue.flush()方法:

能够看到这里调用了mappedFile.flush(flushLeastPages):

这里最终调用了NIO 的MappedByteBuffer.force();经过使用MappedByteBuffer,提升大文件磁盘读写性能。

CommitRealTimeService

包含提交和异步刷盘逻辑,专门为开启内存字节缓冲区的刷盘服务。transientStorePoolEnable=true 时使用这种方式。

使用配置参数:

commitIntervalCommitLog = 200 // 提交到FileChannel的时间间隔,默认0.2s

commitCommitLogLeastPages=4 //每次提交到File至少须要多少个page(默认是4个)

commitCommitLogThoroughInterval = 200 //提交完成间隔时间(默认0.2s)

代码逻辑与FlushRealTimeService基本同样,这里就不贴出来了。

消息追加时,先直接写入堆内内存writeBuffer,而后定时commit到堆外内存FileChannel,再定时flush。

总结:

一、同步刷盘

每次发送消息,消息都直接存储在FileChannel中,使用的是(MapFile的MappdByteBuffer),而后直接调用force()方法刷写到磁盘,等到force刷盘成功后,再返回给调用方,(GroupCommitRequest.waitForFlush)就是其同步调用的实现。

二、异步刷盘

分为两种状况,是否开启堆内存缓存池,具体配置参数:MessageStoreConfig.transientStorePoolEnable

transientStorePoolEnable=true

先写入内存字节缓冲区(writeBuffer) ----> 从内存字节缓冲区(write buffer)提交(commit)到文件通道(fileChannel) ----> 文件通道(fileChannel)定时flush到磁盘

transientStorePoolEnable=false(默认)

写入映射文件字节缓冲区(mappedByteBuffer) ----> 映射文件字节缓冲区(mappedByteBuffer)定时flush

(3) 主从同步HA

有两种类型:

同步双写 SYNC_MASTER

异步复制 ASYNC_MASTER

若是是SYNC_MASTER模式,消息发送者将消息刷写到磁盘后,须要继续等待新数据被传输到从服务器,从服务器数据的复制是在另一个线程HAConnection中去拉取,因此消息发送者在这里须要等待数据传输的结果,GroupTransferService就是实现该功能。而ASYNC_MASTER模式,消息在master写入成功,即会返回成功,无需等待slave。

因此,异步复制性能高于同步双写,业务没有主从强一致要求的话,推荐使用ASYNC_MASTER模式。

如下是异步复制的源码:

 

7. RocketMQ 事务消息

在RocketMQ中生产者有三种角色NormalProducer(普通)、OrderProducer(顺序)、TransactionProducer(事务),咱们这里用TransactionProducer(事务)来解决分布式事务问题。

说到分布式事务,就会谈到经典的”帐户转帐”问题:2个帐户,分布处于2个不一样的DB,或者说2个不一样的系统里面,A要扣钱,B要加钱,如何保证原子性?

通常的思路都是经过消息中间件来实现“最终一致性”:A系统扣钱,而后发条消息给中间件,B系统接收此消息,进行加钱。

但这里面有个问题:A是先update DB,后发送消息呢?仍是先发送消息,后update DB?

假设先update DB成功,发送消息网络失败,重发又失败,怎么办? 

假设先发送消息成功,update DB失败。消息已经发出去了,又不能撤回,怎么办?

因此,这里下个结论:只要发送消息和update DB这2个操做不是原子的,不管谁先谁后,都是有问题的。

那这个问题怎么解决呢??

为了能解决该问题,同时又不和业务耦合,RocketMQ提出了“事务消息”的概念。

 

rocketmq实现分布式事务:

经过消息的异步事务,能够保证本地事务和消息发送同时执行成功或失败,从而保证了数据的最终一致性。

  • 发送prepare消息,该消息对Consumer不可见
  • 执行本地事务
  • 若本地事务执行成功,则向MQ提交消息确认发送指令;若本地事务执行失败,则向MQ发送取消指令
  • 若MQ长时间未收到确认发送或取消发送的指令,则向业务系统询问(回查)本地事务状态,并作补偿处理

具体来讲,就是把消息的发送分红了2个阶段:Prepare阶段和确认阶段。

流程图以下:

源码位于broker.transaction下:

注意:3.2.6以后的版本事务回查接口被删除了,阿里不对外开源了,事务回查接口须要使用者本身编写。