你应该知道的RocketMQ

1.概述

在好久以前写过一篇Kafka相关的文章,你须要知道的Kafka,那个时候在业务上更多的是使用的是Kafka,而如今换了公司以后,更多的使用的是Rocketmq,本篇文章会尽力全面的介绍RocketMQ和Kafka各个关键点的比较,但愿你们读完能有所收获。java

RocketMQ前身叫作MetaQ, 在MeataQ发布3.0版本的时候更名为RocketMQ,其本质上的设计思路和Kafka相似,可是和Kafka不一样的是其使用Java进行开发,因为在国内的Java受众群体远远多于Scala,因此RocketMQ是不少以Java语言为主的公司的首选。一样的RocketMQ和Kafka都是Apache基金会中的顶级项目,他们社区的活跃度都很是高,项目更新迭代也很是快。mysql

2.入门实例

2.1 生产者

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
        producer.start();

        for (int i = 0; i < 128; i++)
            try {
                {
                    Message msg = new Message("TopicTest",
                        "TagA",
                        "OrderID188",
                        "Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
                    SendResult sendResult = producer.send(msg);
                    System.out.printf("%s%n", sendResult);
                }

            } catch (Exception e) {
                e.printStackTrace();
            }

        producer.shutdown();
    }
}

直接定义好一个producer,建立好Message,调用send方法便可。算法

2.2 消费者

public class PushConsumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("CID_JODIE_1");
        consumer.subscribe("TopicTest", "*");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //wrong time format 2017_0422_221800
        consumer.setConsumeTimestamp("20181109221800");
        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

3.RocketMQ架构原理

对于RocketMQ先抛出几个问题:sql

  • RocketMQ的topic和队列是什么样的,和Kafka的分区有什么不一样?
  • RocketMQ网络模型是什么样的,和Kafka对好比何?
  • RocketMQ消息存储模型是什么样的,如何保证高可靠的存储,和Kafka对好比何?

3.1 RocketMQ架构图

对于RocketMQ的架构图,在大致上来看和Kafka并无太多的差异,可是在不少细节上是有不少差异的,接下来会一一进行讲述。微信

3.2 RocketMQ名词解释

在3.1的架构中咱们有多个Producer,多个主Broker,多个从Broker,每一个Producer能够对应多个Topic,每一个Consumer也能够消费多个Topic。网络

Broker信息会上报至NameServer,Consumer会从NameServer中拉取Broker和Topic的信息。架构

  • Producer:消息生产者,向Broker发送消息的客户端
  • Consumer:消息消费者,从Broker读取消息的客户端
  • Broker:消息中间的处理节点,这里和kafka不一样,kafka的Broker没有主从的概念,均可以写入请求以及备份其余节点数据,RocketMQ只有主Broker节点才能写,通常也经过主节点读,当主节点有故障或者一些其余特殊状况才会使用从节点读,有点相似- 于mysql的主从架构。
  • Topic:消息主题,一级消息类型,生产者向其发送消息, 消费者读取其消息。
  • Group:分为ProducerGroup,ConsumerGroup,表明某一类的生产者和消费者,通常来讲同一个服务能够做为Group,同一个Group通常来讲发送和消费的消息都是同样的。
  • Tag:Kafka中没有这个概念,Tag是属于二级消息类型,通常来讲业务有关联的可使用同一个Tag,好比订单消息队列,使用Topic_Order,Tag能够分为Tag_食品订单,Tag_服装订单等等。
  • Queue: 在kafka中叫Partition,每一个Queue内部是有序的,在RocketMQ中分为读和写两种队列,通常来讲读写队列数量一致,若是不一致就会出现不少问题。
  • NameServer:Kafka中使用的是ZooKeeper保存Broker的地址信息,以及Broker的Leader的选举,在RocketMQ中并无采用选举Broker的策略,因此采用了无状态的NameServer来存储,因为NameServer是无状态的,集群节点之间并不会通讯,因此上传数据的时候都须要向全部节点进行发送。

不少朋友都在问什么是无状态呢?状态的有无实际上就是数据是否会作存储,有状态的话数据会被持久化,无状态的服务能够理解就是一个内存服务,NameServer自己也是一个内存服务,全部数据都存储在内存中,重启以后都会丢失。并发

3.3 Topic和Queue

在RocketMQ中的每一条消息,都有一个Topic,用来区分不一样的消息。一个主题通常会有多个消息的订阅者,当生产者发布消息到某个主题时,订阅了这个主题的消费者均可以接收到生产者写入的新消息。框架

在Topic中有分为了多个Queue,这实际上是咱们发送/读取消息通道的最小单位,咱们发送消息都须要指定某个写入某个Queue,拉取消息的时候也须要指定拉取某个Queue,因此咱们的顺序消息能够基于咱们的Queue维度保持队列有序,若是想作到全局有序那么须要将Queue大小设置为1,这样全部的数据都会在Queue中有序。异步

在上图中咱们的Producer会经过一些策略进行Queue的选择:

  • 非顺序消息:非顺序消息通常直接采用轮训发送的方式进行发送。
  • 顺序消息:根据某个Key好比咱们常见的订单Id,用户Id,进行Hash,将同一类数据放在同一个队列中,保证咱们的顺序性。

咱们同一组Consumer也会根据一些策略来选Queue,常见的好比平均分配或者一致性Hash分配。

要注意的是当Consumer出现下线或者上线的时候,这里须要作重平衡,也就是Rebalance,RocketMQ的重平衡机制以下:

  • 定时拉取broker,topic的最新信息
  • 每隔20s作重平衡
  • 随机选取当前Topic的一个主Broker,这里要注意的是否是每次重平衡全部主Broker都会被选中,由于会存在一个Broker再多个Broker的状况。
  • 获取当前Broker,当前ConsumerGroup的全部机器ID。
  • 而后进行策略分配。

因为重平衡是定时作的,因此这里有可能会出现某个Queue同时被两个Consumer消费,因此会出现消息重复投递。

Kafka的重平衡机制和RocketMQ不一样,Kafka的重平衡是经过Consumer和Coordinator联系来完成的,当Coordinator感知到消费组的变化,会在心跳过程当中发送重平衡的信号,而后由一个ConsumerLeader进行重平衡选择,而后再由Coordinator将结果通知给全部的消费者。

3.3.1 Queue读写数量不一致

在RocketMQ中Queue被分为读和写两种,在最开始接触RocketMQ的时候一直觉得读写队列数量配置不一致不会出现什么问题的,好比当消费者机器不少的时候咱们配置不少读的队列,可是实际过程当中发现会出现消息没法消费和根本没有消息消费的状况。

  • 当写的队列数量大于读的队列的数量,当大于读队列这部分ID的写队列的数据会没法消费,由于不会将其分配给消费者。
  • 当读的队列数量大于写的队列数量,那么多的队列数量就不会有消息被投递进来。

这个功能在RocketMQ在我看来明显没什么用,由于基本上都会设置为读写队列大小同样,这个为啥不直接将其进行统一,反而容易让用户配置不同出现错误。

这个问题在RocketMQ的Issue里也没有收到好的答案。

3.4 消费模型

通常来讲消息队列的消费模型分为两种,基于推送的消息(push)模型和基于拉取(poll)的消息模型。

基于推送模型的消息系统,由消息代理记录消费状态。消息代理将消息推送到消费者后,标记这条消息为已经被消费,可是这种方式没法很好地保证消费的处理语义。好比当咱们把已经把消息发送给消费者以后,因为消费进程挂掉或者因为网络缘由没有收到这条消息,若是咱们在消费代理将其标记为已消费,这个消息就永久丢失了。若是咱们利用生产者收到消息后回复这种方法,消息代理须要记录消费状态,这种不可取。

用过RocketMQ的同窗确定不由会想到,在RocketMQ中不是提供了两种消费者吗? MQPullConsumerMQPushConsumer,其中MQPushConsumer不就是咱们的推模型吗?其实这两种模型都是客户端主动去拉消息,其中的实现区别以下:

  • MQPullConsumer:每次拉取消息须要传入拉取消息的offset和每次拉取多少消息量,具体拉取哪里的消息,拉取多少是由客户端控制。
  • MQPushConsumer:一样也是客户端主动拉取消息,可是消息进度是由服务端保存,Consumer会定时上报本身消费到哪里,因此Consumer下次消费的时候是能够找到上次消费的点,通常来讲使用PushConsumer咱们不须要关心offset和拉取多少数据,直接使用便可。

3.4.1 集群消费和广播消费

消费模式咱们分为两种,集群消费,广播消费:

  • 集群消费: 同一个GroupId都属于一个集群,通常来讲一条消息只会被任意一个消费者处理。
  • 广播消费:广播消费的消息会被集群中全部消费者进行消息,可是要注意一下由于广播消费的offset在服务端保存成本过高,因此客户端每一次重启都会从最新消息消费,而不是上次保存的offset。

3.5 网络模型

在Kafka中使用的原生的socket实现网络通讯,而RocketMQ使用的是Netty网络框架,如今愈来愈多的中间件都不会直接选择原生的socket,而是使用的Netty框架,主要得益于下面几个缘由:

  • API使用简单,不须要关心过多的网络细节,更专一于中间件逻辑。
  • 性能高。
  • 成熟稳定,jdk nio的bug都被修复了。

选择框架是一方面,而想要保证网络通讯的高效,网络线程模型也是一方面,咱们常见的有1+N(1个Acceptor线程,N个IO线程),1+N+M(1个acceptor线程,N个IO线程,M个worker线程)等模型,RocketMQ使用的是1+N1+N2+M的模型,以下图所示:

1个acceptor线程,N1个IO线程,N2个线程用来作Shake-hand,SSL验证,编解码;M个线程用来作业务处理。这样的好处将编解码,和SSL验证等一些可能耗时的操做放在了一个单独的线程池,不会占据咱们业务线程和IO线程。

3.6 高可靠的分布式存储模型

作为一个好的消息系统,高性能的存储,高可用都不可少。

3.6.1 高性能日志存储

RocketMQ和Kafka的存储核心设计有很大的不一样,因此其在写入性能方面也有很大的差异,这是16年阿里中间件团队对RocketMQ和Kafka不一样Topic下作的性能测试:

从图上能够看出:

  • Kafka在Topic数量由64增加到256时,吞吐量降低了98.37%。
  • RocketMQ在Topic数量由64增加到256时,吞吐量只降低了16%。 这是为何呢?kafka一个topic下面的全部消息都是以partition的方式分布式的存储在多个节点上。同时在kafka的机器上,每一个Partition其实都会对应一个日志目录,在目录下面会对应多个日志分段。因此若是Topic不少的时候Kafka虽然写文件是顺序写,但实际上文件过多,会形成磁盘IO竞争很是激烈。

那RocketMQ为何在多Topic的状况下,依然还能很好的保持较多的吞吐量呢?咱们首先来看一下RocketMQ中比较关键的文件:

这里有四个目录(这里的解释就直接用RocketMQ官方的了):

  • commitLog:消息主体以及元数据的存储主体,存储Producer端写入的消息主体内容,消息内容不是定长的。单个文件大小默认1G ,文件名长度为20位,左边补零,剩余为起始偏移量,好比00000000000000000000表明了第一个文件,起始偏移量为0,文件大小为1G=1073741824;当第一个文件写满了,第二个文件为00000000001073741824,起始偏移量为1073741824,以此类推。消息主要是顺序写入日志文件,当文件满了,写入下一个文件;
  • config:保存一些配置信息,包括一些Group,Topic以及Consumer消费offset等信息。
  • consumeQueue:消息消费队列,引入的目的主要是提升消息消费的性能,因为RocketMQ是基于主题topic的订阅模式,消息消费是针对主题进行的,若是要遍历commitlog文件中根据topic检索消息是很是低效的。Consumer便可根据ConsumeQueue来查找待消费的消息。其中,ConsumeQueue(逻辑消费队列)做为消费消息的索引,保存了指定Topic下的队列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件能够当作是基于topic的commitlog索引文件,故consumequeue文件夹的组织方式以下:topic/queue/file三层组织结构,具体存储路径为:

HOME \store\index\${fileName},文件名fileName是以建立时的时间戳命名的,固定的单个IndexFile文件大小约为400M,一个IndexFile能够保存 2000W个索引,IndexFile的底层存储设计为在文件系统中实现HashMap结构,故rocketmq的索引文件其底层实现为hash索引。

咱们发现咱们的消息主体数据并无像Kafka同样写入多个文件,而是写入一个文件,这样咱们的写入IO竞争就很是小,能够在不少Topic的时候依然保持很高的吞吐量。有同窗说这里的ConsumeQueue写是在不停的写入呢,而且ConsumeQueue是以Queue维度来建立文件,那么文件数量依然不少,在这里ConsumeQueue的写入的数据量很小,每条消息只有20个字节,30W条数据也才6M左右,因此其实对咱们的影响相对Kafka的Topic之间影响是要小不少的。咱们整个的逻辑能够以下:

Producer不断的再往CommitLog添加新的消息,有一个定时任务ReputService会不断的扫描新添加进来的CommitLog,而后不断的去构建ConsumerQueue和Index。

注意:这里指的都是普通的硬盘,在SSD上面多个文件并发写入和单个文件写入影响不大。 读取消息 Kafka中每一个Partition都会是一个单独的文件,因此当消费某个消息的时候,会很好的出现顺序读,咱们知道OS从物理磁盘上访问读取文件的同时,会顺序对其余相邻块的数据文件进行预读取,将数据放入PageCache,因此Kafka的读取消息性能比较好。

RocketMQ读取流程以下:

  • 先读取ConsumerQueue中的offset对应CommitLog物理的offset
  • 根据offset读取CommitLog

ConsumerQueue也是每一个Queue一个单独的文件,而且其文件体积小,因此很容易利用PageCache提升性能。而CommitLog,因为同一个Queue的连续消息在CommitLog实际上是不连续的,因此会形成随机读,RocketMQ对此作了几个优化:

  • Mmap映射读取,Mmap的方式减小了传统IO将磁盘文件数据在操做系统内核地址空间的缓冲区和用户应用程序地址空间的缓冲区之间来回进行拷贝的性能开销
  • 使用DeadLine调度算法+SSD存储盘
  • 因为Mmap映射受到内存限制,当不在Mmmap映射这部分数据的时候(也就是消息堆积过多),默认是内存的40%,会将请求发送到SLAVE,减缓Master的压力

3.6.2 可用性

3.6.2.1 集群模式

咱们首先须要选择一种集群模式,来适应咱们可忍耐的可用程度,通常来讲分为三种:

  • 单Master:这种模式,可用性最低,可是成本也是最低,一旦宕机,全部都不可用。这种通常只适用于本地测试。
  • 单Master多SLAVE:这种模式,可用性通常,若是主宕机,那么全部写入都不可用,读取依然可用,若是master磁盘损坏,能够依赖slave的数据。
  • 多Master:这种模式,可用性通常,若是出现部分master宕机,那么这部分master上的消息都不可消费,也不可写数据,若是一个Topic的队列在多个Master上都有,那么能够保证没有宕机的那部分能够正常消费,写入。若是master的磁盘损坏会致使消息丢失。
  • 多Master多Slave:这种模式,可用性最高,可是维护成本也最高,当master宕机了以后,只会出如今这部分master上的队列不可写入,可是读取依然是能够的,而且若是master磁盘损坏,能够依赖slave的数据。

通常来讲投入生产环境的话都会选择第四种,来保证最高的可用性。

3.6.2.2 消息的可用性

当咱们选择好了集群模式以后,那么咱们须要关心的就是怎么去存储和复制这个数据,rocketMQ对消息的刷盘提供了同步和异步的策略来知足咱们的,当咱们选择同步刷盘以后,若是刷盘超时会给返回FLUSH_DISK_TIMEOUT,若是是异步刷盘不会返回刷盘相关信息,选择同步刷盘能够尽最大程度知足咱们的消息不会丢失。

除了存储有选择以后,咱们的主从同步提供了同步和异步两种模式来进行复制,固然选择同步能够提高可用性,可是消息的发送RT时间会降低10%左右。

3.6.3 Dleger

咱们上面对于master-slave部署模式已经作了不少分析,咱们发现,当master出现问题的时候,咱们的写入怎么都会不可用,除非恢复master,或者手动将咱们的slave切换成master,致使了咱们的Slave在多数状况下只有读取的做用。RocketMQ在最近的几个版本中推出了Dleger-RocketMQ,使用Raft协议复制CommitLog,而且自动进行选主,这样master宕机的时候,写入依然保持可用。

有关Dleger-RocketMQ的信息更多的能够查看这篇文章:Dledger-RocketMQ 基于Raft协议的commitlog存储库

3.7 定时/延时消息

定时消息和延时消息在实际业务场景中使用的比较多,好比下面的一些场景:

  • 订单超时未支付自动关闭,由于在不少场景中下单以后库存就被锁定了,这里须要将其进行超时关闭。
  • 须要一些延时的操做,好比一些兜底的逻辑,当作完某个逻辑以后,能够发送延时消息好比延时半个小时,进行兜底检查补偿。
  • 在某个时间给用户发送消息,一样也可使用延时消息。

在开源版本的RocketMQ中延时消息并不支持任意时间的延时,须要设置几个固定的延时等级,目前默认设置为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h,从1s到2h分别对应着等级1到18,而阿里云中的版本(要付钱)是能够支持40天内的任什么时候刻(毫秒级别)。咱们先看下在RocketMQ中定时任务原理图:

 

  • Step1:Producer在本身发送的消息上设置好须要延时的级别。
  • Step2: Broker发现此消息是延时消息,将Topic进行替换成延时Topic,每一个延时级别都会做为一个单独的queue,将本身的Topic做为额外信息存储。
  • Step3: 构建ConsumerQueue
  • Step4: 定时任务定时扫描每一个延时级别的ConsumerQueue。
  • Step5: 拿到ConsumerQueue中的CommitLog的Offset,获取消息,判断是否已经达到执行时间
  • Step6: 若是达到,那么将消息的Topic恢复,进行从新投递。若是没有达到则延迟没有达到的这段时间执行任务。

能够看见延时消息是利用新建单独的Topic和Queue来实现的,若是咱们要实现40天以内的任意时间度,基于这种方案,那么须要402460601000个queue,这样的成本是很是之高的,那阿里云上面的支持任意时间是怎么实现的呢?这里猜想是持久化二级TimeWheel时间轮,二级时间轮用于替代咱们的ConsumeQueue,保存Commitlog-Offset,而后经过时间轮不断的取出当前已经到了的时间,而后再次投递消息。具体的实现逻辑须要后续会单独写一篇文章。

3.8 事务消息

事务消息一样的也是RocketMQ中的一大特点,其能够帮助咱们完成分布式事务的最终一致性,有关分布式事务相关的能够看我之前的不少文章都有不少详细的介绍,这里直接关注公众号:咖啡拿铁。

具体使用事务消息步骤以下:

  • Step1:调用sendMessageInTransaction发送事务消息
  • Step2: 若是发送成功,则执行本地事务。
  • Step3: 若是执行本地事务成功则发送commit,若是失败则发送rollback。
  • Step4: 若是其中某个阶段好比commit发送失败,rocketMQ会进行定时从Broker回查,本地事务的状态。

事务消息的使用整个流程相对以前几种消息使用比较复杂,下面是事务消息实现的原理图:

  • Step1: 发送事务消息,这里也叫作halfMessage,会将Topic替换为HalfMessage的Topic。
  • Step2: 发送commit或者rollback,若是是commit这里会查询出以前的消息,而后将消息复原成原Topic,而且发送一个OpMessage用于记录当前消息能够删除。若是是rollback这里会直接发送一个OpMessage删除。
  • Step3: 在Broker有个处理事务消息的定时任务,定时对比halfMessage和OpMessage,若是有OpMessage且状态为删除,那么该条消息一定commit或者rollback,因此就能够删除这条消息。
  • Step4: 若是事务超时(默认是6s),尚未opMessage,那么颇有可能commit信息丢了,这里会去反查咱们的Producer本地事务状态。
  • Step5: 根据查询出来的信息作Step2。

咱们发现RocketMQ实现事务消息也是经过修改原Topic信息,和延迟消息同样,而后模拟成消费者进行消费,作一些特殊的业务逻辑。固然咱们还能够利用这种方式去作RocketMQ更多的扩展。

4.总结

这里让咱们在回到文章中提到的几个问题:

  • RocketMQ的topic和队列是什么样的,和Kafka的分区有什么不一样?
  • RocketMQ网络模型是什么样的,和Kafka对好比何?
  • RocketMQ消息存储模型是什么样的,如何保证高可靠的存储,和Kafka对好比何?

本文分享自微信公众号 - 咖啡拿铁(close_3092860495),做者:咖啡拿铁