RocketMQ做为一款纯java、分布式、队列模型的开源消息中间件,支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。java
由开源社区killme2008维护,开源社区很是活跃。https://github.com/killme2008/Metamorphosisgit
于2012年10月份上线,在淘宝内部被普遍使用。github
Metaq 3.0发布时,产品名称改成RocketMQ。基于公司内部开源共建原则,RocketMQ项目只维护核心功能,且去除了全部其余运行时的依赖,核心功能最简化。每一个BU的个性化需求都在RocketMQ项目之上进行深度定制。RocketMQ向其余BU提供的仅仅是jar包,例如要定制一个Broker,那么只须要依赖rocketmq-broker这个jar包便可,可经过API进行交互,若是定制client,则依赖rocketmq-client这个jar包,对其提供的api进行再封装。docker
在Metaq1.x/2.x的版本中,分布式协调采用的是Zookeeper,而RocketMQ本身实现了一个NameServer,更加轻量级,性能更好!数据库
消息生产者,生产者的做用就是将消息发送到 MQ,生产者自己既能够产生消息,如读取文本信息等。也能够对外提供接口,由外部应用来调用接口,再由生产者将收到的消息发送到 MQ。api
生产者组,简单来讲就是多个发送同一类消息的生产者称之为一个生产者组。在这里能够不用关心,只要知道有这么一个概念便可。缓存
消息消费者,简单来讲,消费 MQ 上的消息的应用程序就是消费者,至于消息是否进行逻辑处理,仍是直接存储到数据库等取决于业务须要。网络
消费者组,和生产者相似,消费同一类消息的多个 consumer 实例组成一个消费者组。数据结构
Topic 是一种消息的逻辑分类,好比说你有订单类的消息,也有库存类的消息,那么就须要进行分类,一个是订单 Topic 存放订单相关的消息,一个是库存 Topic 存储库存相关的消息。架构
Message 是消息的载体。一个 Message 必须指定 topic,至关于寄信的地址。Message 还有一个可选的 tag 设置,以便消费端能够基于 tag 进行过滤消息。也能够添加额外的键值对,例如你须要一个业务 key 来查找 broker 上的消息,方便在开发过程当中诊断问题。
标签能够被认为是对 Topic 进一步细化。通常在相同业务模块中经过引入标签来标记不一样用途的消息。
Broker 是 RocketMQ 系统的主要角色,其实就是前面一直说的 MQ。Broker 接收来自生产者的消息,储存以及为消费者拉取消息的请求作好准备。
Name Server 为 producer 和 consumer 提供路由信息。
RocketMQ是一个分布式消息中间件,并支持事务消息、顺序消息、批量消息、定时消息、消息回溯等。它里面有几个区别于标准消息中件间的概念,如Group、Topic、Queue等。系统组成则由Producer、Consumer、Broker、NameServer等。
RocketMQ以Topic来管理不一样应用的消息,对于生产者(producer)而言,发送消息时须要指定消息的Topic,对于消费者(consumer)而言,在启动后须要订阅相应的Topic,而后能够消费相应的消息。Topic是逻辑上的概念,在物理实现上,一个Topic由多个Queue组成,采用多个Queue的好处是能够将Broker存储分布式化,提升系统性能。
NameServer的功能,在RocketMQ的前身是使用ZooKeeper。NameServer用于管理全部Broker节点信息,接收Broker的注册/注销请求,此外还记录了Topic与Broker、Queue的对应关系,Broker主备信息。BrokerId为0表明是MasterBroker,不然BrokerId大于0的表示为SlaveBroker,Master和Slave组成一个Broker,具备相同的brokerName。Broker在启动的时候会去NameServer进行注册,会维护Broker的存活状态,Broker每次发送心跳过来的时候都会把Topic信息带上。NamesrvStartUp为启动类、NamesrvController为控制类、RouteInfoManager存放了Topic队列信息以及地址列表等一系列重要数据结构并提供了对应的数据变动接口、DefaultRequestProcessor负责处理所broker发过来的全部网络消息。各NameServer之间是相互独立且没有通讯的,经过给Broker的namesrvAddr配置多个NameServer地址,同时向多个NameServer注册信息来实现NameServer集群。由于NameServer读写压力比较小,因此稳定性较高。相应的生产者/消费者中的namesrvAddr也是配置多个。
Broker,每一个Broker都会和NameServer创建一个长链接保持心跳。一个Topic分布在多个Broker上,一个Broker能够配置多个Topic。因为消息分布在各个Broker上,一旦某个Broker宕机,则该Broker上的消息读写都会受到影响。因此须要HA机制,RocketMQ的实现方式是master/slave,salve定时从master同步数据,若是master宕机,则slave提供消费服务,可是不能写入消息。一旦某个broker master宕机,生产者和消费者多久才能发现?受限于rocketmq的网络链接机制,默认状况下,最多须要30秒,但这个时间可由应用设定参数来缩短期。这个时间段内,发往该broker的消息都是失败的,并且该broker的消息没法消费,由于此时消费者不知道该broker已经挂掉。消费者获得master宕机通知后,转向slave消费,可是slave不能保证master的消息100%都同步过来了,所以会有少许的消息丢失。可是消息最终不会丢的,一旦master恢复,未同步过去的消息会被消费掉。
SendMessageProcessor处理全部发往broker的消息,由BrokerController调用DefaultMessageStore来保存消息(processRequest -> sendMessage/sendBatchMessage -> getMessageStore().putMessage)。消息体由CommitLog记录,首先会判断是否为延迟消息,若是是则会改写Topic,并保存好真实的Topic信息,而后写入对应的MappedFile(MappedByteBuffer)。若是是异步刷盘,异步同步Slave则消息到这里就算是记录完了,直接返回producer成功。异步刷盘时,只有机器宕机,才会产生消息丢失,broker挂掉可能会发生,可是机器宕机崩溃是不多发生的,除非忽然断电。若是是同步刷盘,消息写入物理文件才会返回成功,刷盘本质其实就是调用MappedByteBuffer.force。HA是由master/slave实现,这个也分同步仍是异步。而后还有后台线程异步的把CommitLog文件同步到ConsumeQueue中,ConsumeQueue是CommitLog的索引,它记录了消息在CommitLog中的位置。Producer对应CommitLog,发送的消息写入CommitLog,Consumer对应ConsumeQueue,消费对应的ConsumeQueue队列。
不管CommitLog,仍是ConsumeQueue,都有一个对应的MappedFileQueue,也就是对应的内存映射文件的链表,对外提供一个逻辑上的文件。MapedFileQueue包含了不少MapedFile(AllocateMappedFileService负责建立MappedFile),以及MapedFile的真实大小,MapedFile包含了具体的文件信息,包括文件路径、文件名、文件起始偏移、写位移、读位移,刷盘位移等等信息,同时使用了虚拟内存映射来提升IO效率(MappedByteBuffer)。MapedFile的文件名就是消息在此文件的中初始偏移量(文件的起始偏移量),MapedFile链表逻辑上是连续的,就是靠这个机制实现。一个PageSize默认为4k,对应Linux的PageCache缓存大小,一个MapedFile默认最大为1G(因此一个消息最大也是1G,在MessageStoreConfig中配置),异步刷盘线程默认1s触发一次,可是要满4页(16k)才会刷盘,或10s作一次强制刷盘(FlushRealTimeService异步CommitLog刷盘,GroupCommitService同步CommitLog刷盘)。读写时根据offset定位到链表中,对应的MappedFile进行读写。经过MappedFile,就很好的解决了大文件随机读的性能问题。MappedFile继承自ReferenceResource,它里面实现了一个引用计数,获取和释放都要增减这个计数,当引用计数为0的时候就会回调cleanup方法。MappedFile的cleanup实现就是经过反射调用cleaner().clean()以释放映射内存,cleaner方法是在DirectByteBuffer里,MappedByteBuffer实现类就为DirectByteBuffer,但DirectByteBuffer是package的,外面并不能访问到。
一台Broker上全部消息(不论是什么Topic)都是记录在一个CommitLog上,CommitLog里面记录了每条消息的消费状况,是否被消费,由谁消费(queueid),该消息是否持久化等信息,每条消息的长度是不同的。同步Slave是由一个单独的线程顺序的同步CommitLog文件,所谓的等待同步Slave成功后才返回,实质是等待同步线程下标到了指定下标而已,因此Master和Slave的CommitLog文件内容及顺序都是一致的。CommitLog中存储的消息格式已经指定好该消息对应的topic及存到consumeQueue中对应的topic的那个队列(queueid),究竟写入那个consumequeue的那个queueid,这是由客户端投递消息的时候指定的,客户端作的负载均衡,选择不一样queueid投递。通常来讲客户端是轮询queue投递消息,但若是要保证消息顺序,原理就是客户端把相关消息投递到同一个queueid,这样消费者消费的时候就是顺序读取了。只要消息到了CommitLog,发送的消息也就不会丢,有后台线程异步的同步到ConsumeQueue,再由Consumer进行消费。ConsumeQueue是消息的逻辑队列,至关于字典的目录,用来指定消息在物理文件CommitLog上的位置。也就是说CommitLog只有一个(顺序写),但ConsumeQueue确有多个(随机读),ConsumeQueue与Topic对应。
CommitLog只有一个,写入消息体的时候为保证消息顺序写入是会加锁的,加锁有两种方式,一种是ReentrantLock,一种是自旋compareAndSet,加锁的范围只限定在写入MappedFile中,刷盘及同步并不在加锁范围。
消息写入CommitLog以后,会有单独的线程任务(ReputMessageService)每隔1毫秒读取CommitLog文件,把新的消息信息分别调用CommitLogDispatcherBuildConsumeQueue及CommitLogDispatcherBuildIndex的dispatch方法加入到ConsumeQueue及IndexService中去。IndexService以Message Key构建了索引,能够经过Key来过滤查询消息。ConsumeQueuek中的消息都是定长的(20字节),消息数量也是固定的(也就是物理文件是固定大小),物理文件名字和CommitLog同样都是开始偏移量。一个ConsumeQueue只对应一个Topic,包含了消息在CommitLog的开始位置、大小以及tagsCode,只因此使用tag的HashCode就是为了保持长度固定,为保证准确在后面使用tag过滤的时候还会再进行一次字符串比较。消费者消费的时候是先从指定的ConsumeQueue中拉取消息ID以及进行一次简单的Tag过滤(若是须要的话),而后再一次的读取CommitLog文件获取真正的消息体。
若是消息是事务消息且状态是PREPARED或ROLLBACK的,则不会同步到ConsumeQueue中,见CommitLogDispatcherBuildConsumeQueue.dispatch,只有当消息不是事务消息或事务状态为COMMIT时才会同步到ConsumeQueue。事务消息是基于二阶段提交:一阶段,向Broker发送一条PREPARED,记录在CommitLog中并返回消息偏移位置,本地事务须要提供一个RocketMQ的回调(LocalTransactionExecuter),用于回调执行本地事务。二阶段,处理完本地事务后,返回本地事务状态,根据状态(COMMIT或ROLLBACK)去设置消息,而后添加到CommitLog中,最后同步到ConsumeQueue进行消费(事务消息记录了两次CommitLog一次ConsumeQueue)。
若是消息是延迟消息,则消息会先投递到SCHEDULE_TOPIC_XXXX中(消息内容也同样是记录在CommitLog),这个topic有若干队列, 每个队列对应了一个延迟level(延迟时间并非精准的),会有一个任务(ScheduleMessageService)去轮询这些队列,等时间到了则把消息从新写入到原来的Topic,而后同步到ConsumeQueue中(延迟消息投递了两次,即两次CommitLog两次ConsumeQueue)。
PullMessageProcessor处理消费者的请求,由BrokerController调用DefaultMessageStore.getMessage来从指定的Topic下的QueueId队列的QueueOffset下标开始拉取一批消息。首先根据Topic及QueueId定位到ConsumeQueue,而后根据QueueOffset获取到MappedFile而且返回指定位置开始的内存映射对象SelectMappedBufferResult。而后开始从指定位置遍历ConsumeQueue,通过滤Tags后,从CommitLog指定位置获取消息体,若是有过滤表达式则过滤,经过后把消息加入到结果列表。若是消息体过大,Master剩余物理内存不够,或者开启Slave读取消息,则会设置让客户端从Slave拉取消息。若是须要在发送消费消息前进行什么处理,能够注册ConsumeMessageHook,默认没有。最后向客户端写入消息内容,写入消息内容有两种方式:一种是把消息内容读取出来返回,还有一种是使用Netty的FileRegion领拷贝机制直接把内容从堆外内存中返回,默认为第一种读取到堆内存返回,这里是否是由于节省的大量小消息的复制还不如堆外内存建立的开销。RocketMQ拉取消息是长轮询,若是没有查询到消息,条件知足的话会挂起请求。