使用消息中间件时,如何保证消息仅仅被消费一次?

消息中间件使用普遍,经常使用来削峰填谷、系统解耦、异步处理。异步处理多是使用的最多的场景了,好比如今的技术博客网站,都采用积分制,用户发表一篇文章后,能够获取想要的积分,为了提高系统的性能,给用户加积分的操做能够异步处理,并不须要放在同步流程中。html

咱们能够把用户ID,须要增长的积分封装成一条消息投递到消息系统中,异步处理加积分操做,因为这是发生在不一样服务器之间,消息有可能投递失败、处理失败等问题,从而致使用户加积分失败,还有一种多是消息重复投递,那么用户就有可能重复加积分,无论出现那种状况,都是不正常的状况。java

要避免上面的两种状况,就须要咱们尽可能保证消息不丢失和消息只被消费一次,这篇文章抛开具体的消息中间件,从消息系统的通用层面上,谈谈如何避免这两种状况。git

一、保证消息不丢失

一条消息从生产到消费这条链路中,有三个地方可能会形成消息丢失,分别以下:github

  • 消息从生产者写入到消息队列的过程投递失败。
  • 消息在消息队列中,持久化失败。
  • 消息被消费者消费的过程出现异常。

1.1 在消息生产的过程当中投递失败

消息生产者和消息系统通常都是独立部署在不一样的服务器上,两台服务器之间要通讯就要经过网络来完成,网络是不稳定,可能会发生抖动,那么数据就有可能丢失。网络发生抖动会有如下两种状况。算法

在消息生产的过程当中丢失消息

情景一:消息在传送给消息系统的过程当中发生网络抖动,数据直接丢失。 情景二:消息已经到达消息系统,可是在消息系统给生产者服务器返回信息时,网络发生抖动,此时的数据不必定真正的丢失,极可能只是生产者认为数据丢失。sql

针对消息在消息生产时丢失,能够采起重投机制,当程序检测到网络异常时,将消息再次投递到消息系统。可是从新投递在情景二状况下,可能形成数据重复,如何解决这个问题,在后面会提到。数据库

1.2 在消息队列中持久化失败

消息系统是能够对消息进行持久化,通常都是将消息存储到本地磁盘中,固然也有少数消息中间件支持将数据持久化到数据库中,那么消息系统的性能可能就会降低。服务器

若是你对 Redis 的持久化有必定的了解话,你会发现 Redis 在持久化数据时并非每新增一条就当即存入到本地磁盘,而是会将数据先写入到操做系统的 Page Cache 中,当知足必定条件时,再将 Page Cache 中的数据刷入磁盘,由于这样能够减小对磁盘的随机 I/O 操做,咱们知道随机 I/O 是很是耗时的,这样也提升了系统性能,消息中间件也不例外,在持久化时也是采用这种方式。网络

在某些极端状况下,可能会形成 Page Cache 中的数据丢失,好比忽然停电或者机器异常重启操做。要解决 Page Cache 中数据丢失问题,能够采用集群部署的方式,来尽可能保证数据不丢失。异步

1.3 在消费的过程当中存在消息丢失

消息在消费过程当中也是会发生丢失的,并且在消费过程当中丢失的几率要比前两种状况大不少。一条消息消费过程大概分红三步:消费者拉取消息,消费者处理消息,消息系统更新消费进度。

图片描述

第一步在拉取消息的时候可能发生网络抖动异常,第二步在处理消息的时候可能发生一些业务异常,而致使流程并无走完,若是在第一步、第二步发生异常的状况下,通知消息系统更新消费进度,那么这条失败的消息就永远不会在被处理了,天然就丢失了,其实咱们的业务并无跑完。

要避免消息在消费时丢失的状况,能够在消息接收和处理完成以后才更新消费进度,可是在极端的状况下,会出现消息重复消费的问题,好比某一条消息在处理完成以后,消费者宕机了,这时尚未更新消费进度,消费者重启后,这条消息仍是会被消费到。

二、如何保证消息只被消费一次

消息系统自己不能保证消息仅被消费一次,由于消费自己可能重复、下游系统启动拉取重复、失败重试带来的重复、补偿逻辑致使的重复都有可能造重复消息,要保证消息仅被消费一次能够利用等幂性来实现

等幂是数学上的一个概念,就是屡次执行同一个操做和执行一次操做,最终获得的结果是相同的。

从等幂的概念上就能够看出来,就算消息执行屡次也不会对系统形成影响,那么在使用消息系统时如何保证等幂性呢?由于生产者和消费者都有可能产生重复消息,因此要在生产者和消费者两端都保证等幂性。

保证生产者等幂性,在生产消息的时候,利用雪花算法给消息生成一个全局 ID,在消息系统中维护消息已 ID 映射关系,若是在映射表中已经存在相同 ID,这丢弃这条消息,虽然消息被投递了两次,可是实际上就保存了一条,避免了消息重复问题。

生产者等幂性跟所选者的消息中间件有关系,由于绝大数状况下消息系统不须要咱们本身实现,因此等幂性是不太好控制的,消费者等幂性才是咱们开发人员控制的重点方向

在消费者端能够从通用层和业务层两个方面来作等幂操做,取决于咱们的业务要求。

在通用层面中,利用好消息生成是产生的全局惟一ID,消息被处理成功后,把这个全局 ID 存入到数据中,在处理下一条消息以前,先从数据库中查询这个全局 ID 是否存在,若是已经存在,则直接放弃该消息。

利用这个全局惟一ID就实现了消息等幂性,伪代码以下:

boolean isIDExisted = selectByID(ID); // 判断ID是否存在
if(isIDExisted) {
  return; //存在则直接返回
} else {
  process(message); //不存在,则处理消息
  saveID(ID);   //存储ID
}

可是在极端状况下,这种方式仍是会出问题,若是消息在处理以后,还没来得及保存到数据库,消费者就宕机重启了,重启以后还会再次获取该消息,执行时查询该消息并未被消费过,仍是会执行两次消费。能够引入数据库事务来解决这个问题,可是会下降系统性能。若是对消息重复消费没有特别严格要求的话,直接使用这种没有引入事务的通用方案就行了,毕竟这也是极小几率的事情。

在业务层面上,咱们可选择性就变多了,好比乐观锁、悲观锁、内存去重(https://github.com/RoaringBitmap/RoaringBitmap)等方法。

咱们拿乐观锁来举例,好比咱们要给一个用户加积分,由于加积分操做并不须要放在主业务中,因此就可使用消息系统来异步通知,要使用乐观锁,就须要给积分表添加一个版本号字段。而且在生产消息的时候先查询这个帐号的版本号而且连同消息一块儿发送到消息系统中。

图片描述

消费者拿到消息和版本号后,在执行更新积分操做的 SQL 时带上版本号,相似于:

update score set score = score + 20, version=version+1 where userId=1 and version=1;

这条消息消费成功后,version 就变成了 2,那么若是有重复的 version=1 的消息再次被消费者拉取到,SQL 语句并不会执行成功,从而保证了消息的幂等性。

要保证消息仅被消费一次,咱们须要把重点放在消费者这一段,利用等幂性来保证消息被消费一次。

今天站在消息中间件的通用层面上,聊了聊如何保证数据不丢失和仅被消费一次,但愿今天的文章对您的学习或者工做有所帮助,若是您认为文章有价值,欢迎点个赞,谢谢。

最后

目前互联网上不少大佬都有消息中间件相关文章,若有雷同,请多多包涵了。原创不易,码字不易,还但愿你们多多支持。若文中有所错误之处,还望提出,谢谢。

互联网平头哥

原文出处:https://www.cnblogs.com/jamaler/p/12467206.html

相关文章
相关标签/搜索