系统学习消息队列分享(七) 如何处理消费过程当中的重复消息?

在消息传递过程当中,若是出现传递失败的状况,发送方会执行重试,重试的过程当中就有可能会产生重复的消息。对使用消息队列的业务系统来讲,若是没有对重复消息进行处理,就有可能会致使系统的数据出现错误。前端

好比说,一个消费订单消息,统计下单金额的微服务,若是没有正确处理重复消息,那就会出现重复统计,致使统计结果错误。程序员

你可能会问,若是消息队列自己能保证消息不重复,那应用程序的实现不就简单了?那有没有消息队列能保证消息不重复呢?数据库

消息重复的状况必然存在

在 MQTT 协议中,给出了三种传递消息时可以提供的服务质量标准,这三种服务质量从低到高依次是:并发

  • At most once: 至多一次。消息在传递时,最多会被送达一次。换一个说法就是,没什么消息可靠性保证,容许丢消息。通常都是一些对消息可靠性要求不过高的监控场景使用,好比每分钟上报一次机房温度数据,能够接受数据少许丢失。框架

  • At least once: 至少一次。消息在传递时,至少会被送达一次。也就是说,不容许丢消息,可是容许有少许重复消息出现。异步

  • Exactly once:刚好一次。消息在传递时,只会被送达一次,不容许丢失也不容许重复,这个是最高的等级。分布式

这个服务质量标准不只适用于 MQTT,对全部的消息队列都是适用的。咱们如今经常使用的绝大部分消息队列提供的服务质量都是 At least once,包括 RocketMQ、RabbitMQ 和 Kafka 都是这样。也就是说,消息队列很难保证消息不重复。ide

说到这儿我知道确定有的同窗会反驳我:“你说的不对,我看过 Kafka 的文档,Kafka 是支持 Exactly once 的。”我在这里跟这些同窗解释一下,你说的没错,Kafka 的确是支持 Exactly once,可是我讲的也没有问题,为何呢?函数

Kafka 支持的“Exactly once”和咱们刚刚提到的消息传递的服务质量标准“Exactly once”是不同的,它是 Kafka 提供的另一个特性,Kafka 中支持的事务也和咱们一般意义理解的事务有必定的差别。在 Kafka 中,事务和 Excactly once 主要是为了配合流计算使用的特性,咱们在专栏“进阶篇”这个模块中,会有专门的一节课来说 Kafka 的事务和它支持的 Exactly once 特性。微服务

稍微说一些题外话,Kafka 的团队是一个很是善于包装和营销的团队,你看他们很巧妙地用了两个全部人都很是熟悉的概念“事务”和“Exactly once”来包装它的新的特性,实际上它实现的这个事务和 Exactly once 并非咱们一般理解的那两个特性,可是你深刻了解 Kafka 的事务和 Exactly once 后,会发现其实它这个特性虽然和咱们一般的理解不同,但确实和事务、Exactly once 有必定关系。

这一点上,咱们都要学习 Kafka 团队。一个优秀的开发团队,不只要能写代码,更要能写文档,能写 Slide(PPT),还要能讲,会分享。对于每一个程序员来讲,也是同样的。

咱们把话题收回来,继续来讲重复消息的问题。既然消息队列没法保证消息不重复,就须要咱们的消费代码可以接受“消息是可能会重复的”这一现状,而后,经过一些方法来消除重复消息对业务的影响。

用幂等性解决重复消息问题

通常解决重复消息的办法是,在消费端,让咱们消费消息的操做具有幂等性。

幂等(Idempotence) 原本是一个数学上的概念,它是这样定义的:

若是一个函数 f(x) 知足:f(f(x)) = f(x),则函数 f(x) 知足幂等性。

这个概念被拓展到计算机领域,被用来描述一个操做、方法或者服务。一个幂等操做的特色是,其任意屡次执行所产生的影响均与一次执行的影响相同。

一个幂等的方法,使用一样的参数,对它进行屡次调用和一次调用,对系统产生的影响是同样的。因此,对于幂等的方法,不用担忧重复执行会对系统形成任何改变。

咱们举个例子来讲明一下。在不考虑并发的状况下,“将帐户 X 的余额设置为 100 元”,执行一次后对系统的影响是,帐户 X 的余额变成了 100 元。只要提供的参数 100 元不变,那即便再执行多少次,帐户 X 的余额始终都是 100 元,不会变化,这个操做就是一个幂等的操做。

再举一个例子,“将帐户 X 的余额加 100 元”,这个操做它就不是幂等的,每执行一次,帐户余额就会增长 100 元,执行屡次和执行一次对系统的影响(也就是帐户的余额)是不同的。

若是咱们系统消费消息的业务逻辑具有幂等性,那就不用担忧消息重复的问题了,由于同一条消息,消费一次和消费屡次对系统的影响是彻底同样的。也就能够认为,消费屡次等于消费一次。

从对系统的影响结果来讲:At least once + 幂等消费 = Exactly once。

那么如何实现幂等操做呢?最好的方式就是,从业务逻辑设计上入手,将消费的业务逻辑设计成具有幂等性的操做。可是,不是全部的业务都能设计整天然幂等的,这里就须要一些方法和技巧来实现幂等。

下面我给你介绍几种经常使用的设计幂等操做的方法:

1. 利用数据库的惟一约束实现幂等

例如咱们刚刚提到的那个不具有幂等特性的转帐的例子:将帐户 X 的余额加 100 元。在这个例子中,咱们能够经过改造业务逻辑,让它具有幂等性。

首先,咱们能够限定,对于每一个转帐单每一个帐户只能够执行一次变动操做,在分布式系统中,这个限制实现的方法很是多,最简单的是咱们在数据库中建一张转帐流水表,这个表有三个字段:转帐单 ID、帐户 ID 和变动金额,而后给转帐单 ID 和帐户 ID 这两个字段联合起来建立一个惟一约束,这样对于相同的转帐单 ID 和帐户 ID,表里至多只能存在一条记录。

这样,咱们消费消息的逻辑能够变为:“在转帐流水表中增长一条转帐记录,而后再根据转帐记录,异步操做更新用户余额便可。”在转帐流水表增长一条转帐记录这个操做中,因为咱们在这个表中预先定义了“帐户 ID 转帐单 ID”的惟一约束,对于同一个转帐单同一个帐户只能插入一条记录,后续重复的插入操做都会失败,这样就实现了一个幂等的操做。咱们只要写一个 SQL,正确地实现它就能够了。

基于这个思路,不光是可使用关系型数据库,只要是支持相似“INSERT IF NOT EXIST”语义的存储类系统均可以用于实现幂等,好比,你能够用 Redis 的 SETNX 命令来替代数据库中的惟一约束,来实现幂等消费。

2. 为更新的数据设置前置条件

另一种实现幂等的思路是,给数据变动设置一个前置条件,若是知足条件就更新数据,不然拒绝更新数据,在更新数据的时候,同时变动前置条件中须要判断的数据。这样,重复执行这个操做时,因为第一次更新数据的时候已经变动了前置条件中须要判断的数据,不知足前置条件,则不会重复执行更新数据操做。

好比,刚刚咱们说过,“将帐户 X 的余额增长 100 元”这个操做并不知足幂等性,咱们能够把这个操做加上一个前置条件,变为:“若是帐户 X 当前的余额为 500 元,将余额加 100 元”,这个操做就具有了幂等性。对应到消息队列中的使用时,能够在发消息时在消息体中带上当前的余额,在消费的时候进行判断数据库中,当前余额是否与消息中的余额相等,只有相等才执行变动操做。

可是,若是咱们要更新的数据不是数值,或者咱们要作一个比较复杂的更新操做怎么办?用什么做为前置判断条件呢?更加通用的方法是,给你的数据增长一个版本号属性,每次更数据前,比较当前数据的版本号是否和消息中的版本号一致,若是不一致就拒绝更新数据,更新数据的同时将版本号 +1,同样能够实现幂等更新。

3. 记录并检查操做

若是上面提到的两种实现幂等方法都不能适用于你的场景,咱们还有一种通用性最强,适用范围最广的实现幂等性方法:记录并检查操做,也称为“Token 机制或者 GUID(全局惟一 ID)机制”,实现的思路特别简单:在执行数据更新操做以前,先检查一下是否执行过这个更新操做。

具体的实现方法是,在发送消息时,给每条消息指定一个全局惟一的 ID,消费时,先根据这个 ID 检查这条消息是否有被消费过,若是没有消费过,才更新数据,而后将消费状态置为已消费。

原理和实现是否是很简单?其实一点儿都不简单,在分布式系统中,这个方法实际上是很是难实现的。首先,给每一个消息指定一个全局惟一的 ID 就是一件不那么简单的事儿,方法有不少,但都不太好同时知足简单、高可用和高性能,或多或少都要有些牺牲。更加麻烦的是,在“检查消费状态,而后更新数据而且设置消费状态”中,三个操做必须做为一组操做保证原子性,才能真正实现幂等,不然就会出现 Bug。

好比说,对于同一条消息:“全局 ID 为 8,操做为:给 ID 为 666 帐户增长 100 元”,有可能出现这样的状况:

  • t0 时刻:Consumer A 收到条消息,检查消息执行状态,发现消息未处理过,开始执行“帐户增长 100 元”;

  • t1 时刻:Consumer B 收到条消息,检查消息执行状态,发现消息未处理过,由于这个时刻,Consumer A 还将来得及更新消息执行状态。

这样就会致使帐户被错误地增长了两次 100 元,这是一个在分布式系统中很是容易犯的错误,必定要引觉得戒。

对于这个问题,固然咱们能够用事务来实现,也能够用锁来实现,可是在分布式系统中,不管是分布式事务仍是分布式锁都是比较难解决问题。

小结

这节课咱们主要介绍了经过幂等消费来解决消息重复的问题,而后我重点讲了几种实现幂等操做的方法,你能够利用数据库的约束来防止重复更新数据,也能够为数据更新设置一次性的前置条件,来防止重复消息,若是这两种方法都不适用于你的场景,还能够用“记录并检查操做”的方式来保证幂等,这种方法适用范围最广,可是实现难度和复杂度也比较高,通常不推荐使用。

这些实现幂等的方法,不只能够用于解决重复消息的问题,也一样适用于,在其余场景中来解决重复请求或者重复调用的问题。好比,咱们能够将 HTTP 服务设计成幂等的,解决前端或者 APP 重复提交表单数据的问题;也能够将一个微服务设计成幂等的,解决 RPC 框架自动重试致使的重复调用问题。这些方法都是通用的,但愿你能作到举一反三,触类旁通。

相关文章
相关标签/搜索