RocketMQ(三)如何解决消息的顺序&重复两大硬伤?

简述

分布式消息系统做为实现分布式系统可扩展、可伸缩性的关键组件,须要具备高吞吐量、高可用等特色。而谈到消息系统的设计,就回避不了两个问题:算法

  • 消息的顺序问题
  • 消息的重复问题

RocketMQ做为阿里开源的一款高性能、高吞吐量的消息中间件,它是怎样来解决这两个问题的?RocketMQ 有哪些关键特性?其实现原理是怎样的?数据库

顺序消息

消息有序指的是一类消息消费时,能按照发送的顺序来消费。例如:一个订单产生了 3 条消息,分别是订单建立、订单付款、订单完成。消费时,要按照这个顺序消费才有意义。但同时订单之间又是能够并行消费的。缓存

假如生产者产生了2条消息:M一、M2,要保证这两条消息的顺序,应该怎样作?你脑中想到的多是这样: 输入图片说明 M1发送到S1后,M2发送到S2,若是要保证M1先于M2被消费,那么须要M1到达消费端后,通知S2,而后S2再将M2发送到消费端。安全

这个模型存在的问题是,若是M1和M2分别发送到两台Server上,就不能保证M1先达到,也就不能保证M1被先消费,那么就须要在MQ Server集群维护消息的顺序。那么如何解决?一种简单的方式就是将M一、M2发送到同一个Server上: 输入图片说明服务器

只要将消息从一台服务器发往另外一台服务器,就会存在网络延迟问题。如上图所示,若是发送M1耗时大于发送M2的耗时,那么M2就先被消费,仍然不能保证消息的顺序。即便M1和M2同时到达消费端,因为不清楚消费端1和消费端2的负载状况,仍然有可能出现M2先于M1被消费。如何解决这个问题?将M1和M2发往同一个消费者便可,且发送M1后,须要消费端响应成功后才能发送M2。网络

但又会引入另一个问题,若是发送M1后,消费端1没有响应,那是继续发送M2呢,仍是从新发送M1?通常为了保证消息必定被消费,确定会选择重发M1到另一个消费端2,就以下图所示。app

这样的模型就严格保证消息的顺序,细心的你仍然会发现问题,消费端1没有响应Server时有两种状况,一种是M1确实没有到达,另一种状况是消费端1已经响应,可是Server端没有收到。若是是第二种状况,重发M1,就会形成M1被重复消费。也就是咱们后面要说的第二个问题,消息重复问题。负载均衡

回过头来看消息顺序问题,严格的顺序消息很是容易理解,并且处理问题也比较容易,要实现严格的顺序消息,简单且可行的办法就是:异步

  • 保证生产者 - MQServer - 消费者是一对一对一的关系

可是这样设计,并行度就成为了消息系统的瓶颈(吞吐量不够),也会致使更多的异常处理,好比:只要消费端出现问题,就会致使整个处理流程阻塞,咱们不得不花费更多的精力来解决阻塞的问题。分布式

但咱们的最终目标是要集群的高容错性和高吞吐量。这彷佛是一对不可调和的矛盾,那么阿里是如何解决的?

  • 世界上解决一个计算机问题最简单的方法:“刚好”不须要解决它!—— 沈询

有些问题,看起来很重要,但实际上咱们能够经过合理的设计或者将问题分解来规避。若是硬要把时间花在解决它们身上,其实是浪费的,效率低下的。从这个角度来看消息的顺序问题,咱们能够得出两个结论:

    1. 不关注乱序的应用实际大量存在
    1. 队列无序并不意味着消息无序

通常消息是经过轮询全部队列来发送的(负载均衡策略),顺序消息能够根据业务,好比说订单号相同的消息发送到同一个队列。下面的示例中,OrderId相同的消息,会发送到同一个队列:

// RocketMQ默认提供了两种MessageQueueSelector实现:随机/Hash
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
    @Override
    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
        Integer id = (Integer) arg;
        int index = id % mqs.size();
        return mqs.get(index);
    }
}, orderId);

在获取到路由信息之后,会根据MessageQueueSelector实现的算法来选择一个队列,同一个OrderId获取到的队列是同一个队列。

private SendResult send()  {
    // 获取topic路由信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    if (topicPublishInfo != null && topicPublishInfo.ok()) {
        MessageQueue mq = null;
        // 根据咱们的算法,选择一个发送队列
        // 这里的arg = orderId
        mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
        if (mq != null) {
            return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
        }
    }
}

2、消息重复

上面在解决消息顺序问题时,引入了一个新的问题,就是消息重复。那么RocketMQ是怎样解决消息重复的问题呢?仍是“刚好”不解决。 形成消息的重复的根本缘由是:网络不可达。只要经过网络交换数据,就没法避免这个问题。因此解决这个问题的办法就是不解决,转而绕过这个问题。那么问题就变成了:若是消费端收到两条同样的消息,应该怎样处理?

    1. 消费端处理消息的业务逻辑保持幂等性
    1. 保证每条消息都有惟一编号且保证消息处理成功与去重表的日志同时出现 第1条很好理解,只要保持幂等性,无论来多少条重复消息,最后处理的结果都同样。第2条原理就是利用一张日志表来记录已经处理成功的消息的ID,若是新到的消息ID已经在日志表中,那么就再也不处理这条消息。

咱们能够看到第1条的解决方式,很明显应该在消费端实现,不属于消息系统要实现的功能。第2条能够消息系统实现,也能够业务端实现。正常状况下出现重复消息的几率不必定大,且由消息系统实现的话,确定会对消息系统的吞吐量和高可用有影响,因此最好仍是由业务端本身处理消息重复的问题,这也是RocketMQ不解决消息重复的问题的缘由。

RocketMQ不保证消息不重复,若是你的业务须要保证严格的不重复消息,须要你本身在业务端去重。

3、事务消息

RocketMQ除了支持普通消息,顺序消息,另外还支持事务消息。首先讨论一下什么是事务消息以及支持事务消息的必要性。咱们以一个转账的场景为例来讲明这个问题:Bob向Smith转帐100块。

在单机环境下,执行事务的状况,大概是下面这个样子: 输入图片说明 当用户增加到必定程度,Bob和Smith的帐户及余额信息已经不在同一台服务器上了,那么上面的流程就变成了这样: 这时候你会发现,一样是一个转帐的业务,在集群环境下,耗时竟然成倍的增加,这显然是不可以接受的。那咱们如何来规避这个问题?

  • 大事务 = 小事务 + 异步

将大事务拆分红多个小事务异步执行。这样基本上可以将跨机事务的执行效率优化到与单机一致。转帐的事务就能够分解成以下两个小事务:

输入图片说明 图中执行本地事务(Bob帐户扣款)和发送异步消息应该保持同时成功或者失败中,也就是扣款成功了,发送消息必定要成功,若是扣款失败了,就不能再发送消息。那问题是:咱们是先扣款仍是先发送消息呢? 首先咱们看下,先发送消息,大体的示意图以下: 输入图片说明 存在的问题是:若是消息发送成功,可是扣款失败,消费端就会消费此消息,进而向Smith帐户加钱。 先发消息不行,那咱们就先扣款呗,大体的示意图以下: 输入图片说明

存在的问题跟上面相似:若是扣款成功,发送消息失败,就会出现Bob扣钱了,可是Smith帐户未加钱。

可能你们会有不少的方法来解决这个问题,好比:直接将发消息放到Bob扣款的事务中去,若是发送失败,抛出异常,事务回滚。这样的处理方式也符合“刚好”不须要解决的原则。RocketMQ支持事务消息,下面咱们来看看RocketMQ是怎样来实现的。

输入图片说明

RocketMQ第一阶段发送Prepared消息时,会拿到消息的地址,第二阶段执行本地事物,第三阶段经过第一阶段拿到的地址去访问消息,并修改状态。细心的你可能又发现问题了,若是确认消息发送失败了怎么办?RocketMQ会按期扫描消息集群中的事物消息,这时候发现了Prepared消息,它会向消息发送者确认,Bob的钱究竟是减了仍是没减呢?若是减了是回滚仍是继续发送确认消息呢?RocketMQ会根据发送端设置的策略来决定是回滚仍是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。

那咱们来看下RocketMQ源码,是否是这样来处理事务消息的。客户端发送事务消息的部分(完整代码请查看:rocketmq-example工程下的com.alibaba.rocketmq.example.transaction.TransactionProducer):

// 未决事务,MQ服务器回查客户端
// 也就是上文所说的,当RocketMQ发现`Prepared消息`时,会根据这个Listener实现的策略来决断事务
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
// 构造事务消息的生产者
TransactionMQProducer producer = new TransactionMQProducer("groupName");
// 设置事务决断处理类
producer.setTransactionCheckListener(transactionCheckListener);
// 本地事务的处理逻辑,至关于示例中检查Bob帐户并扣钱的逻辑
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
producer.start()
// 构造MSG,省略构造参数
Message msg = new Message(......);
// 发送消息
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
producer.shutdown();

接着查看sendMessageInTransaction方法的源码,总共分为3个阶段:发送Prepared消息、执行本地事务、发送确认消息。

public TransactionSendResult sendMessageInTransaction(.....)  {
    // 逻辑代码,非实际代码
    // 1.发送消息
    sendResult = this.send(msg);
    // sendResult.getSendStatus() == SEND_OK
    // 2.若是消息发送成功,处理与消息关联的本地事务单元
    LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
    // 3.结束事务
    this.endTransaction(sendResult, localTransactionState, localException);
}

endTransaction方法会将请求发往broker(mq server)去更新事物消息的最终状态:

    1. 根据sendResult找到Prepared消息
    1. 根据localTransaction更新消息的最终状态

若是endTransaction方法执行失败,致使数据没有发送到broker,broker会有回查线程定时(默认1分钟)扫描每一个存储事务状态的表格文件,若是是已经提交或者回滚的消息直接跳过,若是是prepared状态则会向Producer发起CheckTransaction请求,Producer会调用DefaultMQProducerImpl.checkTransactionState()方法来处理broker的定时回调请求,而checkTransactionState会调用咱们的事务设置的决断方法,最后调用endTransactionOneway让broker来更新消息的最终状态。

再回到转帐的例子,若是Bob的帐户的余额已经减小,且消息已经发送成功,Smith端开始消费这条消息,这个时候就会出现消费失败和消费超时两个问题?解决超时问题的思路就是一直重试,直到消费端消费消息成功,整个过程当中有可能会出现消息重复的问题,按照前面的思路解决便可。

输入图片说明

这样基本上能够解决超时问题,可是若是消费失败怎么办?阿里提供给咱们的解决方法是:人工解决。你们能够考虑一下,按照事务的流程,由于某种缘由Smith加款失败,须要回滚整个流程。若是消息系统要实现这个回滚流程的话,系统复杂度将大大提高,且很容易出现Bug,估计出现Bug的几率会比消费失败的几率大不少。咱们须要衡量是否值得花这么大的代价来解决这样一个出现几率很是小的问题,这也是你们在解决疑难问题时须要多多思考的地方。

4、Producer如何发送消息

Producer轮询某topic下的全部队列的方式来实现发送方的负载均衡,以下图所示: 输入图片说明

首先分析一下RocketMQ的客户端发送消息的源码:

// 构造Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 初始化Producer,整个应用生命周期内,只须要初始化1次
producer.start();
// 构造Message
Message msg = new Message("TopicTest1",// topic
                        "TagA",// tag:给消息打标签,用于区分一类消息,可为null
                        "OrderID188",// key:自定义Key,能够用于去重,可为null
                        ("Hello MetaQ").getBytes());// body:消息内容
// 发送消息并返回结果
SendResult sendResult = producer.send(msg);
// 清理资源,关闭网络链接,注销本身
producer.shutdown();

在整个应用生命周期内,生产者须要调用一次start方法来初始化,初始化主要完成的任务有:

    1. 若是没有指定namesrv地址,将会自动寻址
    1. 启动定时任务:更新namesrv地址、从namsrv更新topic路由信息、清理已经挂掉的broker、向全部broker发送心跳…
    1. 启动负载均衡的服务

初始化完成后,开始发送消息,发送消息的主要代码以下:

private SendResult sendDefaultImpl(Message msg,......) {
    // 检查Producer的状态是不是RUNNING
    this.makeSureStateOK();
    // 检查msg是否合法:是否为null、topic,body是否为空、body是否超长
    Validators.checkMessage(msg, this.defaultMQProducer);
    // 获取topic路由信息
    TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
    // 从路由信息中选择一个消息队列
    MessageQueue mq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);
    // 将消息发送到该队列上去
    sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
}

代码中须要关注的两个方法tryToFindTopicPublishInfo和selectOneMessageQueue。前面说过在producer初始化时,会启动定时任务获取路由信息并更新到本地缓存,因此tryToFindTopicPublishInfo会首先从缓存中获取topic路由信息,若是没有获取到,则会本身去namesrv获取路由信息。selectOneMessageQueue方法经过轮询的方式,返回一个队列,以达到负载均衡的目的。

若是Producer发送消息失败,会自动重试,重试的策略:

  • 重试次数 < retryTimesWhenSendFailed(可配置)
  • 总的耗时(包含重试n次的耗时) < sendMsgTimeout(发送消息时传入的参数)
  • 同时知足上面两个条件后,Producer会选择另一个队列发送消息

5、消息存储

RocketMQ的消息存储是由consume queue和commit log配合完成的。

一、Consume Queue

consume queue是消息的逻辑队列,至关于字典的目录,用来指定消息在物理文件commit log上的位置。

咱们能够在配置中指定consumequeue与commitlog存储的目录 每一个topic下的每一个queue都有一个对应的consumequeue文件,好比:

${rocketmq.home}/store/consumequeue/${topicName}/${queueId}/${fileName}

Consume Queue文件组织,如图所示:

输入图片说明

-1. 根据topic和queueId来组织文件,图中TopicA有两个队列0,1,那么TopicA和QueueId=0组成一个ConsumeQueue,TopicA和QueueId=1组成另外一个ConsumeQueue。

  • 2.按照消费端的GroupName来分组重试队列,若是消费端消费失败,消息将被发往重试队列中,好比图中的%RETRY%ConsumerGroupA。
  • 3.按照消费端的GroupName来分组死信队列,若是消费端消费失败,并重试指定次数后,仍然失败,则发往死信队列,好比图中的%DLQ%ConsumerGroupA。

死信队列(Dead Letter Queue)通常用于存放因为某种缘由没法传递的消息,好比处理失败或者已通过期的消息。

Consume Queue中存储单元是一个20字节定长的二进制数据,顺序写顺序读,以下图所示:

输入图片说明

CommitLog Offset是指这条消息在Commit Log文件中的实际偏移量

Size存储中消息的大小

Message Tag HashCode存储消息的Tag的哈希值:主要用于订阅时消息过滤(订阅时若是指定了Tag,会根据HashCode来快速查找到订阅的消息)

二、Commit Log

CommitLog:消息存放的物理文件,每台broker上的commitlog被本机全部的queue共享,不作任何区分。 文件的默认位置以下,仍然可经过配置文件修改:

${user.home} \store${commitlog}${fileName}

CommitLog的消息存储单元长度不固定,文件顺序写,随机读。消息的存储结构以下表所示,按照编号顺序以及编号对应的内容依次存储。 输入图片说明

三、消息存储实现

消息存储实现,比较复杂,也值得你们深刻了解,后面会单独成文来分析,这小节只以代码说明一下具体的流程。

// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
synchronized (this) {
    long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
    // Here settings are stored timestamp, in order to ensure an orderly global
    msg.setStoreTimestamp(beginLockTimestamp);
    // MapedFile:操做物理文件在内存中的映射以及将内存数据持久化到物理文件中
    MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile();
    // 将Message追加到文件commitlog
    result = mapedFile.appendMessage(msg, this.appendMessageCallback);
    switch (result.getStatus()) {
    case PUT_OK:break;
    case END_OF_FILE:
         // Create a new file, re-write the message
         mapedFile = this.mapedFileQueue.getLastMapedFile();
         result = mapedFile.appendMessage(msg, this.appendMessageCallback);
     break;
     DispatchRequest dispatchRequest = new DispatchRequest(
                topic,// 1
                queueId,// 2
                result.getWroteOffset(),// 3
                result.getWroteBytes(),// 4
                tagsCode,// 5
                msg.getStoreTimestamp(),// 6
                result.getLogicsOffset(),// 7
                msg.getKeys(),// 8
                /**
                 * Transaction
                 */
                msg.getSysFlag(),// 9
                msg.getPreparedTransactionOffset());// 10
    // 1.分发消息位置到ConsumeQueue
    // 2.分发到IndexService创建索引
    this.defaultMessageStore.putDispatchRequest(dispatchRequest);
}

四、消息的索引文件

若是一个消息包含key值的话,会使用IndexFile存储消息索引,文件的内容结构如图:

输入图片说明

输入图片说明

索引文件主要用于根据key来查询消息的,流程主要是:

    1. 根据查询的 key 的 hashcode%slotNum 获得具体的槽的位置(slotNum 是一个索引文件里面包含的最大槽的数目,例如图中所示 slotNum=5000000)
    1. 根据 slotValue(slot 位置对应的值)查找到索引项列表的最后一项(倒序排列,slotValue 老是指向最新的一个索引项)
    1. 遍历索引项列表返回查询时间范围内的结果集(默认一次最大返回的 32 条记录)。

6、消息订阅

RocketMQ消息订阅有两种模式,一种是Push模式,即MQServer主动向消费端推送;另一种是Pull模式,即消费端在须要时,主动到MQServer拉取。但在具体实现时,Push和Pull模式都是采用消费端主动拉取的方式。

首先看下消费端的负载均衡: 输入图片说明

消费端会经过RebalanceService线程,10秒钟作一次基于topic下的全部队列负载:

    1. 遍历Consumer下的全部topic,而后根据topic订阅全部的消息
    1. 获取同一topic和Consumer Group下的全部Consumer
    1. 而后根据具体的分配策略来分配消费队列,分配的策略包含:平均分配、消费端配置等

如同上图所示:若是有 5 个队列,2 个 consumer,那么第一个 Consumer 消费 3 个队列,第二 consumer 消费 2 个队列。这里采用的就是平均分配策略,它相似于咱们的分页,TOPIC下面的全部queue就是记录,Consumer的个数就至关于总的页数,那么每页有多少条记录,就相似于某个Consumer会消费哪些队列。

经过这样的策略来达到大致上的平均消费,这样的设计也能够很方面的水平扩展Consumer来提升消费能力。

消费端的Push模式是经过长轮询的模式来实现的,就如同下图:

输入图片说明

Consumer端每隔一段时间主动向broker发送拉消息请求,broker在收到Pull请求后,若是有消息就当即返回数据,Consumer端收到返回的消息后,再回调消费者设置的Listener方法。若是broker在收到Pull请求时,消息队列里没有数据,broker端会阻塞请求直到有数据传递或超时才返回。

固然,Consumer端是经过一个线程将阻塞队列LinkedBlockingQueue中的PullRequest发送到broker拉取消息,以防止Consumer一致被阻塞。而Broker端,在接收到Consumer的PullRequest时,若是发现没有消息,就会把PullRequest扔到ConcurrentHashMap中缓存起来。broker在启动时,会启动一个线程不停的从ConcurrentHashMap取出PullRequest检查,直到有数据返回。

7、RocketMQ的其余特性

前面的6个特性都是基本上都是点到为止,想要深刻了解,还须要你们多多查看源码,多多在实际中运用。固然除了已经提到的特性外,RocketMQ还支持:

  1. 定时消息
  1. 消息的刷盘策略
  2. 主动同步策略:同步双写、异步复制
  3. 海量消息堆积能力
  4. 高效通讯 .
  5. ……

其中涉及到的不少设计思路和解决方法都值得咱们深刻研究:

  1. 消息的存储设计:既要知足海量消息的堆积能力,又要知足极快的查询效率,还要保证写入的效率。
  1. 高效的通讯组件设计:高吞吐量,毫秒级的消息投递能力都离不开高效的通讯。
  2. …….

RocketMQ最佳实践

1、Producer最佳实践

一、一个应用尽量用一个 Topic,消息子类型用 tags 来标识,tags 能够由应用自由设置。只有发送消息设置了tags,消费方在订阅消息时,才能够利用 tags 在 broker 作消息过滤。

二、每一个消息在业务层面的惟一标识码,要设置到 keys 字段,方便未来定位消息丢失问题。因为是哈希索引,请务必保证 key 尽量惟一,这样能够避免潜在的哈希冲突。

三、消息发送成功或者失败,要打印消息日志,务必要打印 sendresult 和 key 字段。

四、对于消息不可丢失应用,务必要有消息重发机制。例如:消息发送失败,存储到数据库,能有定时程序尝试重发或者人工触发重发。

五、某些应用若是不关注消息是否发送成功,请直接使用sendOneWay方法发送消息。

2、Consumer最佳实践

消费过程要作到幂等(即消费端去重)

尽可能使用批量方式消费方式,能够很大程度上提升消费吞吐量

优化每条消息消费过程

3、其余配置

线上应该关闭autoCreateTopicEnable,即在配置文件中将其设置为false。

RocketMQ在发送消息时,会首先获取路由信息。若是是新的消息,因为MQServer上面尚未建立对应的Topic,这个时候,若是上面的配置打开的话,会返回默认TOPIC的(RocketMQ会在每台broker上面建立名为TBW102的TOPIC)路由信息,而后Producer会选择一台Broker发送消息,选中的broker在存储消息时,发现消息的topic尚未建立,就会自动建立topic。后果就是:之后全部该TOPIC的消息,都将发送到这台broker上,达不到负载均衡的目的。

因此基于目前RocketMQ的设计,建议关闭自动建立TOPIC的功能,而后根据消息量的大小,手动建立TOPIC。

RocketMQ设计相关 RocketMQ的设计假定:

每台PC机器均可能宕机不可服务

任意集群都有可能处理能力不足

最坏的状况必定会发生

内网环境须要低延迟来提供最佳用户体验

RocketMQ的关键设计:

分布式集群化

强数据安全

海量数据堆积

毫秒级投递延迟(推拉模式)
相关文章
相关标签/搜索