跟我学RocketMQ之消息幂等

在上篇中,咱们了解了RocketMQ中的消息重试机制以及如何在Producer、Consumer两端对重试消息进行处理。redis

RocketMQ会在消息消费时,按照必定规则推送消息到消费者端进行消息重试。这里涉及到了消息幂等的概念。数据库

首先咱们了解一下什么是幂等,以及何为消息幂等。apache

什么是幂等

百度对 “幂等” 解释以下缓存

设f为一由X映射至X的一元运算,则f为幂等的,当对于全部在X内的x,
f(f(x)) = f(x).
特别的是,恒等函数必定是幂等的,且任一常数函数也都是幂等的。网络

这里的关键是 f(f(x)) = f(x), 翻译成通俗的解释就是:并发

若是有一个操做,屡次执行与一次执行所产生的影响是相同的,咱们就称这个操做是幂等的。异步

关于消息幂等

基于上述的概念,结合消息消费的场景,咱们可以很容易的总结出消息幂等的概念:分布式

即:函数

若是消息重试屡次,消费者端对该重复消息消费屡次与消费一次的结果是相同的,而且屡次消费没有对系统产生反作用,那么咱们就称这个过程是消息幂等的。高并发

例如:

支付场景下,消费者消费扣款消息,对一笔订单进行扣款操做,该扣款操做须要扣除10元。

这个扣款操做重复屡次与执行一次的效果相同,只进行一次真实扣款,用户的扣款记录中对应该笔订单的只有一条扣款流水。不会多扣。那么咱们就说这个扣款操做是符合要求的,这个消费过程是消息幂等的。

须要进行消息幂等的场景

首先咱们回顾一下须要进行消息幂等的场景,也就是上一篇文章提到的消息重复的场景。

  1. 发送时重复:

生产者发送消息时,消息成功投递到broker,但此时发生网络闪断或者生产者down掉,致使broker发送ACK失败。此时生产者因为未能收到消息发送响应,认为发送失败,所以尝试从新发送消息到broker。当消息发送成功后,在broker中就会存在两条相同内容的消息,最终消费者会拉取到两条内容同样而且Message ID也相同的消息。所以形成了消息的重复。

  1. 消费时重复:

消费消息时一样会出现重复消费的状况。当消费者在处理业务完成返回消费状态给broker时,因为网络闪断等异常状况致使未能将消费完成的CONSUME_SUCCESS状态返回给broker。broker为了保证消息被至少消费一次的语义,会在网络环境恢复以后再次投递该条被处理的消息,最终形成消费者屡次收到内容同样而且Message ID也相同的消息,形成了消息的重复。

能够看到,不管是发送时重复仍是消费时重复,最终的效果均为消费者消费时收到了重复的消息,那么咱们就知道:只须要在消费者端统一进行幂等处理就可以实现消息幂等。

实现消息幂等

那么如何才能实现消息幂等呢?

首先咱们要定义消息幂等的两要素:

  1. 幂等令牌
  2. 处理惟一性的确保

咱们必须保证存在幂等令牌的状况下保证业务处理结果的惟一性,才认为幂等实现是成功的。

接下来分别解释这两个要素

幂等令牌

幂等令牌是生产者和消费者二者中的既定协议,在业务中一般是具有惟一业务标识的字符串,如:订单号、流水号等。且通常由生产者端生成并传递给消费者端。

处理惟一性的确保

即服务端应当采用必定的策略保证同一个业务逻辑必定不会重复执行成功屡次。如:使用支付宝进行支付,买一个产品支付屡次只会成功一笔。

较为经常使用的方式是采用缓存去重而且经过对业务标识添加数据库的惟一索引实现幂等。

具体的思路为:如支付场景下,支付的发起端生成了一个支付流水号,服务端处理该支付请求成功后,数据持久化成功。因为表中对支付流水添加了惟一索引,所以当重复支付时会由于惟一索引的存在报错 duplicate entry,服务端的业务逻辑捕获该异常并返回调用侧“重复支付”提示。这样就不会重复扣款。

在上面场景的基础上,咱们还能够引入Redis等缓存组件实现去重:当支付请求打到服务端,首先去缓存进行判断,根据key=“支付流水号”去get存储的值,若是返回为空,代表是首次进行支付操做同时将当前的支付流水号做为key、value能够为任意字符串经过set(key,value,expireTime)存储在redis中。

当重复的支付请求到来时,尝试进行get(支付流水号)操做,这个操做会命中缓存,所以咱们能够认为该请求是重复的支付请求,服务端业务将重复支付的业务提示返回给请求方。

因为咱们通常都会在缓存使用过程当中设置过时时间,缓存可能会失效从而致使请求穿透到持久化存储中(如:MySQL)。所以不能由于引入缓存而放弃使用惟一索引,将两者结合在一块儿是一个比较好的方案。

RocketMQ场景下如何处理消息幂等

了解了两个要素及典型案例以后,咱们回到消息消费的场景。

做为一款高性能的消息中间件,RocketMQ可以保证消息不丢失但不保证消息不重复。若是在RocketMQ中实现消息去重实际也是能够的,可是考虑到高可用以及高性能的需求,若是作了服务端的消息去重,RocketMQ就须要对消息作额外的rehash、排序等操做,这会花费较大的时间和空间等资源代价,收益并不明显。RocketMQ考虑到正常状况下出现重复消息的几率实际上是很小的,所以RocketMQ将消息幂等操做交给了业务方处理。

实际上上述问题的本质在于:网络调用自己存在不肯定性,也就是既不成功也不失败的第三种状态,即所谓的 处理中 状态,所以会有重复的状况发生。这个问题是不少其余的MQ产品一样会遇到的,一般的方法就是要求消费方在消费消息时进行去重,也就是本文咱们说的消费幂等性。

对RocketMQ有必定使用经验的读者可能注意到,每条消息都有一个MessageID,那么咱们可否使用该ID做为去重依据,也就是上面提到的幂等令牌呢?

答案是否认的,由于MessageID可能出现冲突的状况,所以不建议经过MessageID做为处理依据而应当使用业务惟一标识如:订单号、流水号等做为幂等处理的关键依据。

上面也提到了,幂等依据应当由消息生产者生成,在发送消息时候,咱们可以经过消息的key设置该id,对应的API为 org.apache.rocketmq.common.message.setKeys(String keys) 代码以下:

Message sendMessage = new Message(
                    MessageProtocolConst.WALLET_PAY_TOPIC.getTopic(),
                    message.getBytes());复制代码
sendMessage.setKeys("OD0000000001");复制代码

当消息消费者收到该消息时,根据该消息的key作幂等处理,API为 org.apache.rocketmq.common.message.getKeys() 代码以下:

(msgs, context) -> {
        try {
            // 默认msgs只有一条消息
            for (MessageExt msg : msgs) {
                String key = msg.getKeys();
                return walletCharge(msg);
            }
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        } catch (Exception e) {
            LOGGER.error("钱包扣款消费异常,e={}", e);
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }
    }复制代码

消费者经过getKeys()可以读取到生产者设置的幂等依据(如:订单号等),而后业务逻辑围绕该id进行幂等处理便可。

若是你以为每次都须要在生产者侧setkey,在消费者侧getkey,有点繁琐。也能够将该幂等依据设置在消息协议中,消费者接收到消息后解析该id进行幂等操做也是能够的。只须要消息的生产者和消费者约定好如何解析id的协议便可。

具体的幂等逻辑视使用的场景而定,我在这里尝试从个人经验进行一些总结。

消费端常见的幂等操做

  1. 业务操做以前进行状态查询

消费端开始执行业务操做时,经过幂等id首先进行业务状态的查询,如:修改订单状态环节,当订单状态为成功/失败则不须要再进行处理。那么咱们只须要在消费逻辑执行以前经过订单号进行订单状态查询,一旦获取到肯定的订单状态则对消息进行提交,通知broker消息状态为:ConsumeConcurrentlyStatus.CONSUME_SUCCESS

  1. 业务操做前进行数据的检索

逻辑和第一点类似,即消费以前进行数据的检索,若是可以经过业务惟一id查询到对应的数据则不须要进行再后续的业务逻辑。如:下单环节中,在消费者执行异步下单以前首先经过订单号查询订单是否已经存在,这里能够查库也能够查缓存。若是存在则直接返回消费成功,不然进行下单操做。

  1. 惟一性约束保证最后一道防线
上述第二点操做并不能保证必定不出现重复的数据,如:并发插入的场景下,若是没有乐观锁、分布式锁做为保证的前提下,颇有可能出现数据的重复插入操做,所以咱们务必要对幂等id添加惟一性索引,这样就可以保证在并发场景下也能保证数据的惟一性。复制代码
  1. 引入锁机制
上述的第一点中,若是是并发更新的状况,没有使用悲观锁、乐观锁、分布式锁等机制的前提下,进行更新,极可能会出现屡次更新致使状态的不许确。如:对订单状态的更新,业务要求订单只能从初始化->处理中,处理中->成功,处理中->失败,不容许跨状态更新。若是没有锁机制,极可能会将初始化的订单更新为成功,成功订单更新为失败等异常的状况。
    高并发下,建议经过状态机的方式定义好业务状态的变迁,经过乐观锁、分布式锁机制保证屡次更新的结果是肯定的,悲观锁在并发环境不利于业务吞吐量的提升所以不建议使用。复制代码
  1. 消息记录表
这种方案和业务层作的幂等操做相似,因为咱们的消息id是惟一的,能够借助该id进行消息的去重操做,间接实现消费的幂等。
    首先准备一个消息记录表,在消费成功的同时插入一条已经处理成功的消息id记录到该表中,注意必定要 **与业务操做处于同一个事物** 中,当新的消息到达的时候,根据新消息的id在该表中查询是否已经存在该id,若是存在则代表消息已经被消费过,那么丢弃该消息再也不进行业务操做便可。
.....复制代码

确定还有更多的场景我没有涉及到,这里说到的操做均是互相之间有关联的,将他们配合使用更可以保证消费业务的幂等性。

不论怎样,请牢记一个原则:缓存是不可靠的,查询是不可靠的

在高并发的场景下,必定要经过持久化存储的惟一索引以及引入锁机制做为共同保障数据准确性和完整性的最后一道防线!

总结

本文主要讲解了何为幂等及消息消费场景下如何传递惟一幂等id,并进一步分析了如何保证消息幂等的思路以及总结了常见的消息幂等处理方式。

套路是多变的,关键是掌握思路和方法,咱们的原则就是 无论执行多少次,业务表现出来的行为是统一的 , 在这个前提下,咱们引入了操做前查库、操做前查缓存、乐观锁/分布式锁机制、加入惟一索引等多重防重放策略,经过这些策略的综合做用,最终达到了消息幂等的目的。

最后有句话分享,有道无术术可求。有术无道止于术。相信聪明的你必定会在技术的道路上结合实际场景将各类技术手段融会贯通,从而走的愈来愈远。

相关文章
相关标签/搜索