「查缺补漏」巩固你的RocketMQ知识体系

Windows安装部署

下载

地址:[https://www.apache.org/dyn/cl...]java

选择‘Binary’进行下载git

解压已下载工程github

配置

新增系统变量
ROCKETMQ_HOME -> F:RocketMQrocketmq-4.5.2

JAVA_HOME -> F:Java_JDKJDK1.8算法

Path 系统变量新增:Maven/bin目录数据库

PS:RocketMQ 消息存储在C:UsersAdministratorstore store目录中 文件占用较大,注意删除没必要要的内容apache

启动

start mqnamesrv.cmd

start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true缓存

Rocket集成可视化监控插件

  1. 任意目录(拉取项目,随便哪里都行)git clone https://github.com/apache/roc...
  2. 进入‘rocketmq-externalsrocketmq-consolesrcmainresources’文件夹,打开‘application.properties’进行配置
  3. 其实就是一个SpringBoot服务,肯定好端口,别重复便可

    server.port=8100服务器

    rocketmq.config.namesrvAddr=127.0.0.1:9876网络

  4. 进入‘rocketmq-externalsrocketmq-console’文件夹

    执行‘mvn clean package -Dmaven.test.skip=true’,编译生成target数据结构

    java -jar rocketmq-console-ng-1.0.1.jar

  5. 根据配置地址访问: http://127.0.0.1:8100

Rocket可视化监控插件 增长Topic | 自动增长Topic(4.5.2版本)

4.5.2 版本 支持自动建立Topic

4.3.0 版本 必须经过监控程序配置Topic,不然执行程序报错,没有此路由

SpringBoot集成 RocketMQ

<!--RocketMQ-->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.5.2</version>
</dependency>

RocketMQ基本概念

<br/>

概览

基于RocketMQ的分布式系统,通常能够分为四个集群:Name server、broker、producer、consumer

  1. name server

    • 提供轻量级的服务发现和路由服务;
    • 每一个节点都存放了所有的路由信息和对应的读写服务;
    • 存储支持水平扩展
  2. broker

    • 提供知足TOPIC和QUEUE机制的消息存储服务;
    • 有推和拉两种模式;
    • 经过2或3拷贝实现高可用;
    • 提供上亿消息的堆积能力;
    • 提供故障恢复、统计功能和告警功能;
  3. producer

    • 支持分布式部署,经过负载平衡模块给broker发消息
    • 支持快速失败
    • 低延迟
  4. consumer

    1. 支持推和拉两种模式
    2. 支持集群消费和广播消费

<br/>

核心模块

<br/>

Name Server

提供Broker管理;Routing管理(路由管理)

NameServer,不少时候称为命名发现服务,其在RocketMQ中起着中转承接的做用,是一个无状态的服务,多个NameServer之间不通讯。任何Producer,Consumer,Broker与全部NameServer通讯,向NameServer请求或者发送数据。并且都是单向的,Producer和Consumer请求数据,Broker发送数据。正是由于这种单向的通讯,RocketMQ水平扩容变得很容易

  • 提供轻量级的服务发现和路由服务;
  • 每一个节点都存放了所有的路由信息和对应的读写服务;
  • 存储支持水平扩展

总结:相比于ZooKeeper提供的分布式锁,发布和订阅,数据一致性,选举等,在RocketMQ是不适用的,所以重写了一套更加轻量级的发现服务,主要用以存储 Broker相关信息以及当前Broker上的topic信息,路由信息等

Broker Server

提供Remoting Module、客户端管理、存储服务、HA服务(主从)、索引服务
  • 提供知足TOPIC和QUEUE机制的消息存储服务;
  • 有推和拉两种模式;
  • 经过2或3拷贝实现高可用;
  • 提供上亿消息的堆积能力;
  • 提供故障恢复、统计功能和告警功能;

producer

  • 支持分布式部署,经过负载平衡模块给broker发消息
  • 支持快速失败
  • 低延迟

consumer

  • 支持推和拉两种模式
  • 支持集群消费和广播消费

<br/>

核心角色介绍

<br/>

生产者

生产者发送业务系统产生的消息给broker, RocketMQ提供了多种发送方式:同步的、异步的、单向的

<br/>

生产者组

具备相同角色的生产者被分到一组, 假如原始的生产者在事务后崩溃,broker会联系 同一辈子产者组中的不一样生产者实例,继续提交或回滚事务

<br/>

消费者

一个消费者从broker拉取信息,并将信息返还给应用。为了咱们应用的正确性,提供了两种消费者类型:

拉式消费者:拉式消费者从broker拉取消息,一旦一批消息被拉取,用户应用系统将发起消费过程。

推式消费者:推式消费者,从另外一方面讲,囊括了消息的拉取、消费过程,并保持了内部的其余工做,留下了一个回调 接口给终端用户去实现,实如今消息到达时要执行的内容。

<br/>

消费者组

具备相同角色的消费者被组在一块儿,称为消费者组,它完成了负载均衡和容错的目标

一个消费组中的消费者实例必须有肯定的相同的订阅topic

<br/>

Topic(主题)

Topic是一个消息的目录,在这个目录中,生产者传送消息,消费者拉取消息,能够多个消费者订阅同一个topic,一个生产者也能够发送多个topic

PS:RocketMQ 基于发布订阅模式,发布订阅的核心即 Topic 主题

<br/>

Message(消息)

消息是被传递的信息。一个消息必须有一个Topic,它能够理解为信件上的地址。一个消息也能够有一个可选的tag,和额外的key-value对。 例如:你能够设置业务中的键到你的消息中,在broker服务中查找消息,以便在开发期间诊断问题

<br/>

消息队列

Topic被分割成一个或多个消息队列。队列分为3中角色:异步主、同步主、从。若是你不能容忍消息丢失,咱们建议你部署同步主,并加一个从队列。 若是你容忍丢失,但你但愿队列老是可用,你能够部署异步主和从队列。若是你想最简单,你只须要一个异步主,不须要从队列。 消息保存磁盘的方式也有两种,推荐使用的是异步保存,同步保存是昂贵的并会致使性能损失,若是你想要可靠性,咱们推荐你使用同步主+从的方式。

<br/>

Tag(标签)

标签,用另一个词来讲,就是子主题,为用户提供额外的灵活性。具备相同Topic的消息能够有不一样的tag。

<br/>

Broker(队列)

Broker是RocketMQ的一个主要组件,它接收生产者发送的消息,存储它们并准备处理消费者的拉取请求。它也存储消息相关的元数据, 包括消费组,消费成功的偏移量,主题、队列的信息。

<br/>

名称服务

名称服务主要提供路由信息。生产者/消费者客户端寻找topic,并找到通讯的队列列表。

<br/>

消息的顺序

DefaultMQPushConsumer被使用,你就要决定消费消息时,是顺序消费仍是同时消费

  • 顺序消费

  顺序消费消息的意思是 消息将按照生产者发送到队列时的顺序被消费掉。若是你被强制要求使用全局的顺序,你要确保你的topic只有一个消息队列。

若是指定顺序消费,消息被同时消费的数量就是订阅这个topic的消费组的数量。

  • 同时消费

  当同时消费消息时,消息同时消费的最大数量取决于消费客户端指定的线程池的大小。

<br/>

最佳实践

Producer最佳实践
  1. 一个应用尽量用一个 Topic,消息子类型用 tags 来标识,tags 能够由应用自由设置。只有发送消息设置了tags,消费方在订阅消息时,才能够利用 tags 在 broker 作消息过滤。
  2. 每一个消息在业务层面的惟一标识码,要设置到 keys 字段,方便未来定位消息丢失问题。因为是哈希索引,请务必保证 key 尽量惟一,这样能够避免潜在的哈希冲突。

    消息发送成功或者失败,要打印消息日志,务必要打印 sendresult 和 key 字段。

  3. 对于消息不可丢失应用,务必要有消息重发机制。例如:消息发送失败,存储到数据库,能有定时程序尝试重发或者人工触发重发。
  4. 某些应用若是不关注消息是否发送成功,请直接使用sendOneWay方法发送消息。
Consumer最佳实践
  1. 消费过程要作到幂等(即消费端去重)
  2. 尽可能使用批量方式消费方式,能够很大程度上提升消费吞吐量。
  3. 优化每条消息消费过程

MQ核心问题

<br/>

1.消息队列适合解决的问题

解决的核心问题主要是:异步、解耦、削峰

可是引入消息队列也会有不少额外的问题,好比系统复杂性会大大增长,同时须要解决重复下发,重复消费,消费顺序,消息丢失,重试机制等等问题,所以不能滥用,合适的场景用合适的技术

<br/>

2.消息模型:主题和队列的区别

1、消息队列的演进

一、初始阶段

最初的消息队列,就是一个严格意义上的队列。队列是一种数据结构,先进先出,在消息入队出队过程当中,保证这些消息严格有序。早期的消息队列就是按照“队列”的数据结构设计的

队列模型:

生产者(Producer)发消息就是入队操做,消费者(Consumer)收消息就是出队也就是删除操做,服务端存放消息的容器天然就称为“队列”。

  • 若是有多个生产者往同一个队列里面发送消息,这个队列中能够消费到的消息,就是这些生产者生产的全部消息的合集。消息的顺序就是这些生产者发送消息的天然顺序
  • 若是有多个消费者接收同一个队列的消息,这些消费者之间其实是竞争的关系,每一个消费者只能收到队列中的一部分消息,也就是说任何一条消息只能被其中的一个消费者收到

二、发布 - 订阅模型阶段

若是须要将一份消息数据分发给多个消费者,要求每一个消费者都能收到全量的消息,例如,对于一份订单数据,风控系统、分析系统、支付系统等都须要接收消息。

这个时候,单个队列就知足不了需求,一个可行的解决方式是,为每一个消费者建立一个单独的队列,让生产者发送多份。可是一样的一份消息数据被复制到多个队列中会浪费资源,更重要的是,生产者必须知道有多少个消费者。为每一个消费者单独发送一份消息,这实际上违背了消息队列“解耦”这个设计初衷。

为了解决这个问题,演化出了另一种消息模型:发布 - 订阅模型(Publish-Subscribe Pattern)

消息的发送方称为发布者(Publisher),消息的接收方称为订阅者(Subscriber),服务端存放消息的容器称为主题(Topic)。

  • 发布者将消息发送到主题中,订阅者在接收消息以前须要先“订阅主题”。
  • 每份订阅中,订阅者均可以接收到主题的全部消息。

三、总结:

  • 在很长的一段时间,队列模式和发布 - 订阅模式是并存的。
  • 有些消息队列同时支持这两种消息模型,好比 ActiveMQ。
  • 对比这两种模型,生产者就是发布者,消费者就是订阅者,队列就是主题,并无本质的区别。它们最大的区别是:一份消息数据能不能被消费屡次的问题
  • 实际上,在这种发布 - 订阅模型中,若是只有一个订阅者,那它和队列模型就基本是同样的了。也就是说,发布 - 订阅模型在功能层面上是能够兼容队列模型的。

2、RabbitMQ 的消息模型

少数依然坚持使用队列模型的产品之一。

RabbitMQ 使用 Exchange 模块解决多个消费者的问题。Exchange 位于生产者和队列之间,生产者并不关心将消息发送给哪一个队列,而是将消息发送给 Exchange,由 Exchange 上配置的策略来决定将消息投递到哪些队列中。

  • 同一份消息若是须要被多个消费者来消费,须要配置 Exchange 将消息发送到多个队列,每一个队列中都存放一份完整的消息数据,能够为一个消费者提供消费服务。

3、RocketMQ 的消息模型

RocketMQ 使用的消息模型是标准的发布 - 订阅模型。在 RocketMQ 也有队列(Queue)这个概念。

消息队列的消费机制:

几乎全部的消息队列产品都使用一种很是朴素的“请求 - 确认”机制,确保消息不会在传递过程当中因为网络或服务器故障丢失。

在生产端,生产者先将消息发送给服务端,也就是 Broker,服务端在收到消息并将消息写入主题或者队列中后,会给生产者发送确认的响应。若是生产者没有收到服务端的确认或者收到失败的响应,则会从新发送消息

在消费端,消费者在收到消息并完成本身的消费业务逻辑(好比,将数据保存到数据库中)后,也会给服务端发送消费成功的确认,服务端只有收到消费确认后,才认为一条消息被成功消费,不然它会给消费者从新发送这条消息,直到收到对应的消费成功确认。

这个确认机制很好地保证了消息传递过程当中的可靠性,可是,引入这个机制在消费端带来了一个问题:为了确保消息的有序性,在某一条消息被成功消费以前,下一条消息是不能被消费的,也就是说,每一个主题在任意时刻,至多只能有一个消费者实例在进行消费,那就无法经过水平扩展消费者的数量来提高消费端整体的消费性能

为了解决这个问题,RocketMQ 在主题下面增长了队列的概念:

  • 每一个主题包含多个队列,经过多个队列来实现多实例并行生产和消费。须要注意的是,RocketMQ 只在队列上保证消息的有序性,主题层面是没法保证消息的严格顺序的。
  • 生产者会往全部队列发消息,但不是“同一条消息每一个队列都发一次”,每条消息只会往某个队列里面发送一次
  • 一个消费组,每一个队列上只能串行消费,多个队列加一块儿就是并行消费了,并行度就是队列数量,队列数量越多并行度越大,因此水平扩展能够提高消费性能。
  • 每队列每消费组维护一个消费位置(offset),记录这个消费组在这个队列上消费到哪儿了。
  • 订阅者是经过消费组(Consumer Group)来体现的。每一个消费组都消费主题中一份完整的消息,不一样消费组之间消费进度彼此不受影响,也就是说,一条消息被 Consumer Group1 消费过,也会再给 Consumer Group2 消费。
  • 消费组中包含多个消费者,同一个组内的消费者是竞争消费的关系,每一个消费者负责消费组内的一部分消息。若是一条消息被消费者 Consumer1 消费了,那同组的其余消费者就不会再收到这条消息。
  • 因为消息须要被不一样的组进行屡次消费,因此消费完的消息并不会当即被删除,这就须要 RocketMQ 为每一个消费组在每一个队列上维护一个消费位置(Consumer Offset),这个位置以前的消息都被消费过,以后的消息都没有被消费过,每成功消费一条消息,消费位置就加一。咱们在使用消息队列的时候,丢消息的缘由大可能是因为消费位置处理不当致使的

4、Kafka 的消息模型

Kafka 的消息模型和 RocketMQ 是彻底同样的,惟一的区别是,在 Kafka 中,队列这个概念的名称不同,Kafka 中对应的名称是“分区(Partition)”,含义和功能是没有任何区别的。

5、总结

  • 经常使用的消息队列中,RabbitMQ 采用的是队列模型,可是它同样能够实现发布 - 订阅的功能。RocketMQ 和 Kafka 采用的是发布 - 订阅模型,而且两者的消息模型是基本一致的。

<br/>

3.消息丢失怎么办? 如何保证消息的可靠性传输?

首先如何验证消息是否丢失?

  • 若是是 IT 基础设施比较完善的公司,通常都有分布式链路追踪系统,使用相似的追踪系统能够很方便地追踪每一条消息。
  • 若是没有这样的追踪系统,咱们能够利用消息队列的有序性来验证是否有消息丢失

即保证消息消费顺序的状况下,根据消息的序号,在消费段判断是否连续

解决方案:

消息从生产到消费的过程当中,能够划分三个阶段:

一、生产阶段

消息队列经过最经常使用的请求确认机制,来保证消息的可靠传递:当你代码调用发消息方法时,消息队列客户端会把消息发送到Broker,Broker收到消息后,会给客户端返回一个确认响应,代表消息已收到。客户端收到响应后,完成了一次正常消息的发送。

有些消息队列在长时间没收到发送确认响应后,会自动重试,若是重试失败,就会以返回值或者异常的方式告知用户。在编写发送消息的代码时,须要注意,正确处理返回值或者捕获异常,就能够保证这个阶段的消息不会丢失。

同步发送时,只要注意捕获异常便可。

异步发送时,则须要在回调方法里进行检查。这个地方须要特别注意,不少丢消息的缘由就是,咱们使用了异步发送,却没有在回调中检查发送结果。

二、存储阶段

在存储阶段正常状况下,只要Broker在正常运行,就不会出现丢消息的问题;可是若是Broker出现故障,好比进程死掉或者服务器宕机,仍是可能会丢失消息的。

若是对消息的可靠性要求很是高,能够经过配置Broker参数来避免由于宕机丢消息:

  • 对于单个节点的 Broker,须要配置 Broker 参数,在收到消息后,将消息写入磁盘后再给 Producer 返回确认响应,这样即便发生宕机,因为消息已经被写入磁盘,就不会丢失消息,恢复后还能够继续消费。例如,在 RocketMQ 中,须要将刷盘方式 flushDiskType 配置为 SYNC_FLUSH 同步刷盘。
  • 对于 Broker 是由多个节点组成的集群,须要将 Broker 集群配置成:至少将消息发送到 2 个以上的节点,再给客户端回复发送确认响应。这样当某个 Broker 宕机时,其余的 Broker 能够替代宕机的 Broker,也不会发生消息丢失。

三、消息阶段

消费阶段采用和生产阶段相似的确认机制来保证消息的可靠传递,客户端从 Broker 拉取消息后,执行用户的消费业务逻辑,成功后,才会给 Broker 发送消费确认响应。若是 Broker 没有收到消费确认响应,下次拉消息的时候还会返回同一条消息,确保消息不会在网络传输过程当中丢失,也不会由于客户端在执行消费逻辑中出错致使丢失。

在编写消费代码时须要注意的是:不要在收到消息后就当即发送消费确认,而是应该在执行完全部消费业务逻辑以后,再发送消费确认

<br/>

4.处理消费过程当中的重复消息

在消息传递过程当中,若是出现传递失败的状况,发送方会执行重试,重试过程当中就有可能产生重复的消息。若是没有对重复消息进行处理,就可能致使系统的数据出现错误。

好比,一个消费订单消息,统计下单金额的微服务,若是没有正确处理重复消息,那就会出现重复统计,致使统计结果错误。

1、消息重复的状况必然存在

在MQTT协议中,给出了三种传递消息时可以提供的服务质量标准:

  • At most once:至多一次。最多会被送达一次,也就是说没有消息可靠性保证,容许丢消息。通常都是一些对消息可靠性要求不高的监控场景使用,好比每分钟上报一次机房温度数据,能够接受数据少许丢失。
  • At least once:至少一次。至少会被送达一次,也就是说不容许丢消息,可是容许有少许重复消息出现
  • Exactly once:刚好一次。只会被送达一次,不容许丢失也不容许重复,这个是最高等级。

这个服务质量标准不只适用于 MQTT,对全部的消息队列都是适用的。经常使用的绝大部分消息队列提供的服务质量都是 At least once,包括 RocketMQ、RabbitMQ 和 Kafka 。也就是说,消息队列很难保证消息不重复。

注意:Kafka 支持的“Exactly once”和咱们刚刚提到的消息传递的服务质量标准“Exactly once”是不同的,它是 Kafka 提供的另一个特性,Kafka 中支持的事务也和咱们一般意义理解的事务有必定的差别。在 Kafka 中,事务和 Excactly once 主要是为了配合流计算使用的特性。

2、用幂等性解决重复消息问题

幂等原本是一个数学上的概念,它的定义是:若是一个函数f(x)知足:f(f(x)) = f(x),则函数f(x)知足米幂等性。扩展到计算机领域,被用来描述一个操做、方法或者服务。

  • 一个幂等操做的特色是,其任意屡次执行所产生的影响均与一次执行的影响相同
  • 一个幂等方法,使用一样的参数,对它进行屡次调用和一次调用,对系统产生的影响是同样的。因此不用担忧重复执行会对系统形成任何改变。

举例:

一、在不考虑并发的状况下,“将帐户 X 的余额设置为 100 元”,执行一次后对系统的影响是,帐户 X 的余额变成了 100 元。只要提供的参数 100 元不变,那即便再执行多少次,帐户 X 的余额始终都是 100 元,不会变化,这个操做就是一个幂等的操做。

二、“将帐户 X 的余额加 100 元”,这个操做它就不是幂等的,每执行一次,帐户余额就会增长 100 元,执行屡次和执行一次对系统的影响(也就是帐户的余额)是不同的。

若是消费消息的业务逻辑具有幂等性,那就不用担忧消息重复的问题,由于同一条消息,消费一次和消费屡次对系统的影响是彻底同样的。消费屡次等于消费一次。从对系统的影响结果来讲:At least once + 幂等消费 = Exactly once。

实现幂等操做最好的方式是,从业务逻辑设计上入手,将消费的业务逻辑设计成具有幂等性的操做

经常使用的设计幂等操做的方法

(1)利用数据库的惟一约束实现幂等

上面提到的那个不具有幂等特性的转帐的例子:将帐户 X 的余额加 100 元。在这个例子中,咱们能够经过改造业务逻辑,让它具有幂等性。

首先,咱们能够限定,对于每一个转帐单每一个帐户只能够执行一次变动操做,在分布式系统中,这个限制实现的方法很是多,最简单的是咱们在数据库中建一张转帐流水表,这个表有三个字段:转帐单 ID、帐户 ID 和变动金额,而后给转帐单 ID 和帐户 ID 这两个字段联合起来建立一个惟一约束,这样对于相同的转帐单 ID 和帐户 ID,表里至多只能存在一条记录。

这样,咱们消费消息的逻辑能够变为:“在转帐流水表中增长一条转帐记录,而后再根据转帐记录,异步操做更新用户余额便可。”在转帐流水表增长一条转帐记录这个操做中,因为咱们在这个表中预先定义了“帐户 ID 转帐单 ID”的惟一约束,对于同一个转帐单同一个帐户只能插入一条记录,后续重复的插入操做都会失败,这样就实现了一个幂等的操做。

基于这个思路,不光是可使用关系型数据库,只要是支持相似“INSERT IF NOT EXIST”语义的存储类系统均可以用于实现幂等,好比,你能够用 Redis 的 SETNX 命令来替代数据库中的惟一约束,来实现幂等消费。

(2)为更新的数据设置前置条件

给数据变动设置一个前置条件,若是知足条件就更新数据,不然拒绝更新数据,在更新数据的时候,同时变动前置条件中须要判断的数据。这样,重复执行这个操做时,因为第一次更新数据的时候已经变动了前置条件中须要判断的数据,不知足前置条件,则不会重复执行更新数据操做。

好比,“将帐户 X 的余额增长 100 元”这个操做并不知足幂等性,咱们能够把这个操做加上一个前置条件,变为:“若是帐户 X 当前的余额为 500 元,将余额加 100 元”,这个操做就具有了幂等性。对应到消息队列中的使用时,能够在发消息时在消息体中带上当前的余额,在消费的时候进行判断数据库中,当前余额是否与消息中的余额相等,只有相等才执行变动操做。

可是,若是咱们要更新的数据不是数值,或者咱们要作一个比较复杂的更新操做怎么办?用什么做为前置判断条件呢?更加通用的方法是,给你的数据增长一个版本号属性,每次更数据前,比较当前数据的版本号是否和消息中的版本号一致,若是不一致就拒绝更新数据,更新数据的同时将版本号 +1,同样能够实现幂等更新。

(3)记录并检查操做

若是上面提到的两种实现幂等方法都不能适用于你的场景,还有一种通用性最强,适用范围最广的实现幂等性方法:记录并检查操做,也称为“Token 机制或者 GUID(全局惟一 ID)机制”,实现的思路特别简单:在执行数据更新操做以前,先检查一下是否执行过这个更新操做。这种方法适用范围最广,可是实现难度和复杂度也比较高,通常不推荐使用

具体的实现方法是,在发送消息时,给每条消息指定一个全局惟一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,若是没有消费过,才更新数据,而后将消费状态置为已消费

在分布式系统中,这个方法实际上是很是难实现的。首先,给每一个消息指定一个全局惟一的 ID 就是一件不那么简单的事儿,方法有不少,但都不太好同时知足简单、高可用和高性能,或多或少都要有些牺牲。更加麻烦的是,在“检查消费状态,而后更新数据而且设置消费状态”中,三个操做必须做为一组操做保证原子性,才能真正实现幂等,不然就会出现 Bug。

好比说,对于同一条消息:“全局 ID 为 8,操做为:给 ID 为 666 帐户增长 100 元”,有可能出现这样的状况:

  • t0 时刻:Consumer A 收到条消息,检查消息执行状态,发现消息未处理过,开始执行“帐户增长 100 元”;
  • t1 时刻:Consumer B 收到条消息,检查消息执行状态,发现消息未处理过,由于这个时刻,Consumer A 还将来得及更新消息执行状态。

这样就会致使帐户被错误地增长了两次 100 元,这是一个在分布式系统中很是容易犯的错误,必定要引觉得戒。对于这个问题,固然咱们能够用事务来实现,也能够用锁来实现,可是在分布式系统中,不管是分布式事务仍是分布式锁都是比较难解决问题。

<br/>

5.利用事务消息实现分布式事务

1、消息事务

其实不少场景下,咱们“发消息”这个过程,目的每每是通知另一个系统或者模块去更新数据,消息队列中的“事务”,主要解决消息生产者和消息消费者的数据一致性问题

用户在电商APP上购物时,先把商品加到购物车里,而后几件商品一块儿下单,最后支付,完成购物流程。

这个过程当中有一个须要用到消息队列的步骤,订单系统建立订单后,发消息给购物车系统,将已下单的商品从购物车中删除。由于从购物车删除已下单商品这个步骤,并非用户下单支付这个主要流程中必要的步骤,使用消息队列来异步清理购物车是更加合理。

对于订单系统,它建立订单的过程实际执行了2个步骤的操做:

  • 在订单库中插入一条订单数据,建立订单;
  • 发消息给消息队列,消息的内容就是刚刚建立的订单

对于购物车系统:

  • 订阅相应的主题,接收订单建立的消息,而后清理购物车,在购物车中删除订单的商品。

在分布式系统中,上面提到的步骤,任何一个都有可能失败,若是不作任何处理,那就有可能出现订单数据与购物车数据不一致的状况,好比:

  • 建立了订单,没有清理购物车;
  • 订单没建立成功,购物车里面的商品却被清掉了。

因此咱们须要解决的问题为:在上述任意步骤都有可能失败的状况下,还要保证订单库和购物车库这两个库的数据一致性。

2、分布式事务

分布式事务就是要在分布式系统中实现事务。在分布式系统中,在保证可用性和不严重牺牲性能的前提下,光是要实现数据的一致性就已经很是困难了,显然实现严格的分布式事务是更加不可能完成的任务。因此目前你们所说的分布式事务,更多状况下,是在分布式系统中事务的不完整实现,在不一样的应用场景中,有不一样的实现,目的都是经过一些妥协来解决实际问题。

常见的分布式事务实现:

  • 2PC(Two-phase Commit,也叫二阶段提交)
  • TCC(Try-Confirm-Cancel)
  • 事务消息

每一种实现都有其特定的使用场景,也有各自的问题,都不是完美的解决方案。

事务消息适用的场景主要是那些须要异步更新数据,而且对数据实时性要求不过高的场景。好比在建立订单后,若是出现短暂的几秒,购物车里的商品没有被及时状况,也不是彻底不可接受的,只要最终购物车的数据和订单数据保持一致就可。

3、消息队列实现分布式事务

事务消息须要消息队列提供相应的功能才能实现,kafka和RocketMQ都提供了事务相关功能。

对于订单系统:

  • 首先,订单系统在消息队列上开启一个事务。
  • 而后订单系统给消息服务器发送一个“半消息”,这个半消息不是说消息内容不完整,它包含的内容就是完整的消息内容,半消息和普通消息的惟一区别是,在事务提交以前,对于消费者来讲,这个消息是不可见的。
  • 半消息发送成功后,订单系统就能够执行本地事务了,在订单库中建立一条订单记录,并提交订单库的数据库事务。
  • 而后根据本地事务的执行结果决定提交或者回滚事务消息。若是订单建立成功,那就提交事务消息,购物车系统就能够消费到这条消息继续后续的流程。若是订单建立失败,那就回滚事务消息,购物车系统就不会收到这条消息。这样就基本实现了“要么都成功,要么都失败”的一致性要求。

对于购物车系统:

  • 对于购物车系统收到订单建立成功消息清理购物车这个操做来讲,失败的处理比较简单,只要成功执行购物车清理后再提交消费确认便可,若是失败,因为没有提交消费确认,消息队列会自动重试

若是在第四步提交事务消息时失败了怎么办?Kafka 和 RocketMQ 给出了 2 种不一样的解决方案:

一、Kafka 的解决方案:

直接抛出异常,让用户自行处理。咱们能够在业务代码中反复重试提交,直到提交成功,或者删除以前建立的订单进行补偿。

二、RocketMQ 的解决方案:

在 RocketMQ 中的事务实现中,增长了事务反查的机制来解决事务消息提交失败的问题。若是 Producer 也就是订单系统,在提交或者回滚事务消息时发生网络异常,RocketMQ 的 Broker 没有收到提交或者回滚的请求,Broker 会按期去 Producer 上反查这个事务对应的本地事务的状态,而后根据反查结果决定提交或者回滚这个事务。为了支撑这个事务反查机制,咱们的业务代码须要实现一个反查本地事务状态的接口,告知 RocketMQ 本地事务是成功仍是失败。

综合上面讲的通用事务消息的实现和 RocketMQ 的事务反查机制,使用 RocketMQ 事务消息功能实现分布式事务的流程以下图:

<br/>

6.消息队列中的顺序问题

当咱们说顺序时,咱们在说什么?

平常思惟中,顺序大部分状况会和时间关联起来,即时间的前后表示事件的顺序关系。

好比事件A发生在下午3点一刻,而事件B发生在下午4点,那么咱们认为事件A发生在事件B以前,他们的顺序关系为先A后B。

上面的例子之因此成立是由于他们有相同的参考系,即他们的时间是对应的同一个物理时钟的时间。若是A发生的时间是北京时间,而B依赖的时间是东京时间,那么先A后B的顺序关系还成立吗?

若是没有一个绝对的时间参考,那么A和B之间还有顺序吗,或者说怎么判定A和B的顺序?

显而易见的,若是A、B两个事件之间若是是有因果关系的,那么A必定发生在B以前(来龙去脉,有因才有果)。相反,在没有一个绝对的时间的参考的状况下,若A、B之间没有因果关系,那么A、B之间就没有顺序关系。

那么,咱们在说顺序时,其实说的是:

  • 有绝对时间参考的状况下,事件的发生时间的关系;
  • 和没有时间参考下的,一种由因果关系推断出来的happening before的关系;

在分布式环境中讨论顺序

当把顺序放到分布式环境(多线程、多进程均可以认为是一个分布式的环境)中去讨论时:

  • 同一线程上的事件顺序是肯定的,能够认为他们有相同的时间做为参考
  • 不一样线程间的顺序只能经过因果关系去推断

(点表示事件,波浪线箭头表示事件间的消息)

上图中,进程P中的事件顺序为p1->p2->p3->p4(时间推断)。而由于p1给进程Q的q2发了消息,那么p1必定在q2以前(因果推断)。可是没法肯定p1和q1之间的顺序关系。

推荐阅读《Time, Clocks, and the Ordering of Events in a Distributed System》,会透彻的分析分布式系统中的顺序问题。

消息中间件中的顺序消息

什么是顺序消息

有了上述的基础以后,咱们回到本篇文章的主题中,聊一聊消息中间件中的顺序消息。

顺序消息(FIFO 消息)是 MQ 提供的一种严格按照顺序进行发布和消费的消息类型。顺序消息由两个部分组成:顺序发布和顺序消费。

顺序消息包含两种类型:

分区顺序:一个Partition内全部的消息按照先进先出的顺序进行发布和消费

全局顺序:一个Topic内全部的消息按照先进先出的顺序进行发布和消费

这是阿里云上对顺序消息的定义,把顺序消息拆分红了顺序发布和顺序消费。那么多线程中发送消息算不算顺序发布?

如上一部分介绍的,多线程中若没有因果关系则没有顺序。那么用户在多线程中去发消息就意味着用户不关心那些在不一样线程中被发送的消息的顺序。即多线程发送的消息,不一样线程间的消息不是顺序发布的,同一线程的消息是顺序发布的。这是须要用户本身去保障的。

而对于顺序消费,则须要保证哪些来自同一个发送线程的消息在消费时是按照相同的顺序被处理的(为何不说他们应该在一个线程中被消费呢?)。

全局顺序实际上是分区顺序的一个特例,即便Topic只有一个分区(如下不在讨论全局顺序,由于全局顺序将面临性能的问题,并且绝大多数场景都不须要全局顺序)。

如何保证顺序

在MQ的模型中,顺序须要由3个阶段去保障:

  1. 消息被发送时保持顺序
  2. 消息被存储时保持和发送的顺序一致
  3. 消息被消费时保持和存储的顺序一致

发送时保持顺序意味着对于有顺序要求的消息,用户应该在同一个线程中采用同步的方式发送。存储保持和发送的顺序一致则要求在同一线程中被发送出来的消息A和B,存储时在空间上A必定在B以前。而消费保持和存储一致则要求消息A、B到达Consumer以后必须按照先A后B的顺序被处理。

以下图所示:

对于两个订单的消息的原始数据:a一、b一、b二、a二、a三、b3(绝对时间下发生的顺序):

  • 在发送时,a订单的消息须要保持a一、a二、a3的顺序,b订单的消息也相同,可是a、b订单之间的消息没有顺序关系,这意味着a、b订单的消息能够在不一样的线程中被发送出去
  • 在存储时,须要分别保证a、b订单的消息的顺序,可是a、b订单之间的消息的顺序能够不保证
    • a一、b一、b二、a二、a三、b3是能够接受的
    • a一、a二、b一、b二、a三、b3也是能够接受的
    • a一、a三、b一、b二、a二、b3是不能接受的
  • 消费时保证顺序的简单方式就是“什么都不作”,不对收到的消息的顺序进行调整,即只要一个分区的消息只由一个线程处理便可;固然,若是a、b在一个分区中,在收到消息后也能够将他们拆分到不一样线程中处理,不过要权衡一下收益

开源RocketMQ中顺序的实现

上图是RocketMQ顺序消息原理的介绍,将不一样订单的消息路由到不一样的分区中。文档只是给出了Producer顺序的处理,Consumer消费时经过一个分区只能有一个线程消费的方式来保证消息顺序,具体实现以下。

Producer端

Producer端确保消息顺序惟一要作的事情就是将消息路由到特定的分区,在RocketMQ中,经过MessageQueueSelector来实现分区的选择。

  • List<MessageQueue> mqs:消息要发送的Topic下全部的分区
  • Message msg:消息对象
  • 额外的参数:用户能够传递本身的参数

好比以下实现就能够保证相同的订单的消息被路由到相同的分区:

long orderId = ((Order) object).getOrderId;
return mqs.get(orderId % mqs.size());

Consumer端

RocketMQ消费端有两种类型:MQPullConsumer和MQPushConsumer。

MQPullConsumer由用户控制线程,主动从服务端获取消息,每次获取到的是一个MessageQueue中的消息。PullResult中的List msgFoundList天然和存储顺序一致,用户须要再拿到这批消息后本身保证消费的顺序。

对于PushConsumer,由用户注册MessageListener来消费消息,在客户端中须要保证调用MessageListener时消息的顺序性。RocketMQ中的实现以下:

  1. PullMessageService单线程的从Broker获取消息
  2. PullMessageService将消息添加到ProcessQueue中(ProcessMessage是一个消息的缓存),以后提交一个消费任务到ConsumeMessageOrderService
  3. ConsumeMessageOrderService多线程执行,每一个线程在消费消息时须要拿到MessageQueue的锁
  4. 拿到锁以后从ProcessQueue中获取消息

保证消费顺序的核心思想是:

  • 获取到消息后添加到ProcessQueue中,单线程执行,因此ProcessQueue中的消息是顺序的
  • 提交的消费任务时提交的是“对某个MQ进行一次消费”,此次消费请求是从ProcessQueue中获取消息消费,因此也是顺序的(不管哪一个线程获取到锁,都是按照ProcessQueue中消息的顺序进行消费)

顺序和异常的关系

顺序消息须要Producer和Consumer都保证顺序。Producer须要保证消息被路由到正确的分区,消息须要保证每一个分区的数据只有一个线程消息,那么就会有一些缺陷:

  • 发送顺序消息没法利用集群的Failover特性,由于不能更换MessageQueue进行重试
  • 由于发送的路由策略致使的热点问题,可能某一些MessageQueue的数据量特别大
  • 消费的并行读依赖于分区数量
  • 消费失败时没法跳过

不能更换MessageQueue重试就须要MessageQueue有本身的副本,经过Raft、Paxos之类的算法保证有可用的副本,或者经过其余高可用的存储设备来存储MessageQueue。

热点问题好像没有什么好的解决办法,只能经过拆分MessageQueue和优化路由方法来尽可能均衡的将消息分配到不一样的MessageQueue。

消费并行度理论上不会有太大问题,由于MessageQueue的数量能够调整。

消费失败的没法跳过是不可避免的,由于跳过可能致使后续的数据处理都是错误的。不过能够提供一些策略,由用户根据错误类型来决定是否跳过,而且提供重试队列之类的功能,在跳过以后用户能够在“其余”地方从新消费到这条消息。

<br/>

鸣谢

感谢极客时间所属的《消息队列高手课连接

<br/>

最后

本篇是一篇大合集,中间确定参考了许多其余人的文章内容或图片,但因为时间比较久远,当时并无一一记录,为此表示歉意,若是有做者发现了本身的文章或图片,能够私聊我,我会进行补充。

若是你发现写的还不错,能够搜索公众号「是Kerwin啊」,一块儿进步!

也能够查看Kerwin的GitHub主页

相关文章
相关标签/搜索