RabbitMQ最佳实践总结

在使用消息机制时,咱们一般须要考虑如下几个问题:数据库

  • 消息不能丢失
  • 保证消息必定能投递到目的地
  • 保证业务处理和消息发送/消费的一致性

本文以RabbitMQ为例,讨论如何解决以上问题。网络

消息持久化

若是但愿RabbitMQ重启以后消息不丢失,那么须要对如下3种实体均配置持久化:异步

  • exchange
  • queue
  • message

声明exchange时设置持久化(durable = true)而且不自动删除(autoDelete = false):性能

 

RabbitMQ最佳实践总结,帮你少踩不少坑,收藏了

 

 

声明queue时设置持久化(durable = true)而且不自动删除(autoDelete = false):fetch

 

RabbitMQ最佳实践总结,帮你少踩不少坑,收藏了

 

 

发送消息时经过设置deliveryMode=2持久化消息:设计

 

RabbitMQ最佳实践总结,帮你少踩不少坑,收藏了

 

 

发送确认

有时,业务处理成功,消息也发了,可是咱们并不知道消息是否成功到达了rabbitmq,若是因为网络等缘由致使业务成功而消息发送失败,那么发送方将出现不一致的问题,此时可使用rabbitmq的发送确认功能,即要求rabbitmq显式告知咱们消息是否已成功发送。3d

首先须要在channel上设置ConfirmListener:code

 

RabbitMQ最佳实践总结,帮你少踩不少坑,收藏了

 

 

而后在发送消息直线须要开启发送确认模式:blog

//开启发送者确认channel.confirmSelect();

而后发送消息:rabbitmq

channel.basicPublish("order", "order.created", false, properties, "sample-data".getBytes());

当消息正常投递时,rabbitmq客户端将异步调用handleAck()表示消息已经成功投递,此时程序能够自行处理投递成功以后的逻辑,好比在数据库中将消息设置为已发送。当消息投递出现异常时,handleNack()将被调用。

一般来说,发送端只须要保证消息可以发送到exchange便可,而无需关注消息是否被正确地投递到了某个queue,这个是rabbitmq和消息的接收方须要考虑的事情。基于此,若是rabbitmq找不到任何须要投递的queue,那么rabbitmq依然会ack给发送方,此时发送方能够认为消息已经正确投递,而很差用关系消息没有queue接收的问题。可是,对于rabbitmq而言,这种消息是须要记录下来的,不然rabbitmq将直接丢弃该消息。此时能够为exchange设置alternate-exchange,即表示rabbitmq将把没法投递到任何queue的消息发送到alternate-exchange指定的exchange中,一般来讲能够设置一个死信交换(DLX)。

事实上,对于exchange存在可是却找不到任何接收queue时,若是发送是设置了mandatory=true,那么在消息被ack前将return给客户端,此时客户端能够建立一个ReturnListener用于接收返回的消息:

 

RabbitMQ最佳实践总结,帮你少踩不少坑,收藏了

 

 

可是须要注意的是,在return以后,消息依然会被ack而不是nack,还不如不设置madatory呢,所以return有时并不见得有用。

须要注意的是,在发送消息时若是exchange不存在,rabbitmq直接丢弃该消息,而且不会ack或者nack操做,可是在Spring中,会nack。

综合起来,要完成发送方确认,须要作如下几个点:

  • 设置ConfirmListener
  • 经过confirmSelect()开启
  • 为exchange设置alternate-exchange到DLX
  • 发送时没有必要设置mandotory
  • 发送方将消息记录在数据库中,收到ack时在数据库中标记消息为已发送状态
  • 若是收到reject或者因为网络缘由没有收到ack,那么消息状态不会改变,下次发送时再次发送,此时可能致使消息重复,解决重复问题请参考“保证至少一次投递,而且消费端幂”小节。

手动消费确认

有时,消息被正确投递到消费方,可是消费方处理失败,那么便会出现消费方的不一致问题。好比订单已建立的消息发送到用户积分子系统中用于增长用户积分,可是积分消费法处理却都失败了,用户就会问:我购买了东西为何积分并无增长呢?

要解决这个问题,须要引入消费方确认,即只有消息被成功处理以后才告知rabbitmq以ack,不然告知rabbitmq以nack,此时的处理流程以下:

  1. 接收消息,不做ack,处理消息成功则ack,不成功nack
  2. 对于nack的消息,能够配置rabbitmq要么从新投递,要么直接扔掉,要么传到死信交换(DLX)
  3. 若是处理成功,可是因为网络等问题致使确认(不管是ack仍是nack)不成功,那么rabbitmq会从新投递消息,可是此时因为消息已经成功,从新投递便致使了消费重复的消息,此时请参考“保证至少一次投递,而且消费端幂”小节。

在rabbitmq中,消息默认是字段ack的,即消息到达消费方当即ack,而无论消费方业务处理是否成功,为此能够开启手动确认模式,即有消费方自行决定什么时候应该ack,经过设置autoAck=false开启手动确认模式:

 

RabbitMQ最佳实践总结,帮你少踩不少坑,收藏了

 

 

能够看到,在autoAck=false状况下,经过业务处理的是否成功(success())来判断应该ack仍是nack。

另外,为了不消息反复requeue的状况,若是消息第一次消费不成功,则在nack时设置requeue=true,表示告知rabbitmq将reject的消息从新投递,若是第二次消费依然不成功,那么nack时设置requeue=false,告知rabbitmq不要从新投递了,此时rabbitmq将根据本身的配置要么直接扔掉消息,要么将消息发送到DLX中,具体配置请参考“设置死信交换(DLX)和死信队列(DLQ)”。

保证至少一次投递,而且消费端幂等

一般来讲,程序中会先完成写数据库的操做,而后发送消息,此时一个重要的点是保证这二者的一致性,即一旦数据库保存成功消息必须也可以发送成功。要保证发送发一致性,一种作法是使用全局事务,即将数据库操做和消息发送放到一个事务中,好比JTA,可是全局事务是很重的,而且rabbitmq目前并不支持全局事务。

要解决发送发的一致性问题,能够实现将消息保存到数据库的事件表中,此时业务处理的数据库操做和保存消息到数据库属于同一个本地数据库事务,那么到此能够保证业务处理和消息产生的原子性,而后有一个异步的后台任务从数据库的事件表中一次读取未发送的消息发送至rabbitmq,发送成功后更新消息的状态为已发布。

然而,此时咱们依然没法保证发送消息和更新消息状态之间的原子性,由于可能发生消息发送成功可是数据库状态更新不成功的状况,为了解决这种极端状况,能够屡次重试消息发送,步骤以下:

  1. 读取时间表中未发送消息,发送到rabbitmq
  2. 若是发送成功,事件表中消息状态也更新成功,皆大欢喜
  3. 若是消息发送不成功,那么消息状态也不做改变,下次重试
  4. 若是消息发送成功而状态更新不成功,下次重试

不断重试,总有一个可以达到发送消息和状态更新的原子性。

那么问题也来了:rabbitmq中可能出现多条重复消息,此时消费端就懵了。为了解决这个问题,消费方应该设计为幂等的,即对相同消息的屡次消费与单次消费结果相同。有些消费方的业务逻辑自己即是幂等的,而对于自己不幂等的消费方,须要在数据库中记录已经被正确消费的消息,当重复消息来时,判断该消息是否已经被消费,若是没有则执行消费逻辑,若是已经消费则直接忽略。此时消费方的处理步骤以下:

  1. 接收到消息,判断消息是否已经消费,若是是,则直接忽略,此时已然须要作消费成功确认
  2. 若是消息还未被消费,则处理业务逻辑,记录消息,业务逻辑自己和记录消息在同一个数据库事务中,若是都成功,则皆大欢喜;若是失败,那么消费方业务回滚,消息也不记录,此时reject消息,等下次重发

设置消息的TTL和消息队列的max-length

为了保证消息的时效性,能够设置队列中消息的TTL(x-message-ttl),而为了保证消息队列不至于太大而影响性能,能够设置队列的最大消息数(x-max-length)。在建立队列时设置以下:

 

RabbitMQ最佳实践总结,帮你少踩不少坑,收藏了

 

 

设置死信交换(DLX)和死信队列(DLQ)

对于没法投递的消息,咱们须要将其记录下来便于后续跟踪排查,此时能够将这样的消息放入DLX和DLQ中。默认状况下,queue中被抛弃的消息将被直接丢掉,可是能够经过设置queue的x-dead-letter-exchange参数,将被抛弃的消息发送到x-dead-letter-exchange作指定的exchange中,这样的exchange成为DLX。

设置了x-dead-letter-exchange以后,在如下三种状况下消息将被扔到DLX中:

  1. 消费方nack时指定了requeue=false
  2. 消息的TTL已到
  3. 消息队列的max-length已到

在声明queue时定义x-dead-letter-exchange:

 

RabbitMQ最佳实践总结,帮你少踩不少坑,收藏了

 

 

  • 设置DLQ为lazy,而且没有TTL,而且没有max-length
    在如下3种状况下,消息会被投递到DLX中:

须要注意的是,在发送消息时,当已经达到queue的上限,而当queue定义为x-overflow=reject-publish时,rabbitmq将nack。当有多个queue同时绑定到exchange时,若是有些queue设置了reject-publish,而有些却没有,那么依然会nack,这对发送方来讲很差处理。所以,仍是那句话,发送方只须要保证正确地投递到了exchange便可,而不用关系exchange后面有哪些queue。

设置Prefetch count

Prefetch count表示消费方一次性从rabbitmq读取的消息数量,若是设置过大,那么消费方可能始终处于高负荷运转状态,而若是过小又会增长网络开销,一般设置为20-50。另外,有时为了保证多个消费方均衡地分摊消息处理任务,一般设置prefetch count为1。

异常处理

在以上设置的状况下,咱们来看看当各类异常发生时,rabbitmq是如何运做的:

  • broker不可达:直接抛出异常;
  • 发送方本身始终发送不出去:消息状态始终处于“未发送”,不会破坏一致性,可是对于事件表中累计太多的事件须要关注;
  • exchange不存在:消息被丢掉,rabbitmq不会ack,消息状态始终处于“未发送”,下次将从新发送,不会破坏一致性,可是当exchange持续不存在下去,那么事件表中事件也会累计太多;
  • exchange存在可是没有接受queue:消息将被ack并标记为“已发送”,但因为设置了alternative exchange为dlx,那么消息将发送到dlx对应的dlq中保存以便后续处理;
  • consumer不在线,而累积消息太多:消息一致性没有问题,可是当累计到了max-length上限,消息队列头部的消息将被放置dlq中以便后续处理;
  • consumer临时性失败:经过redelivered判断是否为重复投递,若是是,则nack而且requeue=false,表示若是重复投递的一次的消息若是再失败,那么直接扔到dlx中,也即消息最多重复投递一次;
  • consumer始终失败:全部消息均被投入dlq以便后续处理,此时可能须要关注dlq的长度是否太长。

路由策略

系统中每每会发布多种类型的消息,在发送时有几种路由策略:

  • 全部类型的消息都发送到同一个exchange中
  • 每种类型的消息都单独配置一个exchange
  • 对消息类型进行归类,同一类型的消息对应一个exchange

笔者建议采用最后一种,而且结合DDD中的聚合划分,路由策略建议以下:

每个聚合根下发布的全部类型的事件对应一个exchange,exchange设置为topic,queue能够配置接收某一种类型的事件,也能够配置接收全部某种聚合相关的事件,还能够配置接收全部事件。