在使用消息机制时,咱们一般须要考虑如下几个问题:数据库
本文以RabbitMQ为例,讨论如何解决以上问题。网络
若是但愿RabbitMQ重启以后消息不丢失,那么须要对如下3种实体均配置持久化:异步
声明exchange时设置持久化(durable = true)而且不自动删除(autoDelete = false):性能
声明queue时设置持久化(durable = true)而且不自动删除(autoDelete = false):fetch
发送消息时经过设置deliveryMode=2持久化消息:设计
有时,业务处理成功,消息也发了,可是咱们并不知道消息是否成功到达了rabbitmq,若是因为网络等缘由致使业务成功而消息发送失败,那么发送方将出现不一致的问题,此时可使用rabbitmq的发送确认功能,即要求rabbitmq显式告知咱们消息是否已成功发送。3d
首先须要在channel上设置ConfirmListener:code
而后在发送消息直线须要开启发送确认模式:blog
//开启发送者确认channel.confirmSelect();
而后发送消息:rabbitmq
channel.basicPublish("order", "order.created", false, properties, "sample-data".getBytes());
当消息正常投递时,rabbitmq客户端将异步调用handleAck()表示消息已经成功投递,此时程序能够自行处理投递成功以后的逻辑,好比在数据库中将消息设置为已发送。当消息投递出现异常时,handleNack()将被调用。
一般来说,发送端只须要保证消息可以发送到exchange便可,而无需关注消息是否被正确地投递到了某个queue,这个是rabbitmq和消息的接收方须要考虑的事情。基于此,若是rabbitmq找不到任何须要投递的queue,那么rabbitmq依然会ack给发送方,此时发送方能够认为消息已经正确投递,而很差用关系消息没有queue接收的问题。可是,对于rabbitmq而言,这种消息是须要记录下来的,不然rabbitmq将直接丢弃该消息。此时能够为exchange设置alternate-exchange,即表示rabbitmq将把没法投递到任何queue的消息发送到alternate-exchange指定的exchange中,一般来讲能够设置一个死信交换(DLX)。
事实上,对于exchange存在可是却找不到任何接收queue时,若是发送是设置了mandatory=true,那么在消息被ack前将return给客户端,此时客户端能够建立一个ReturnListener用于接收返回的消息:
可是须要注意的是,在return以后,消息依然会被ack而不是nack,还不如不设置madatory呢,所以return有时并不见得有用。
须要注意的是,在发送消息时若是exchange不存在,rabbitmq直接丢弃该消息,而且不会ack或者nack操做,可是在Spring中,会nack。
综合起来,要完成发送方确认,须要作如下几个点:
有时,消息被正确投递到消费方,可是消费方处理失败,那么便会出现消费方的不一致问题。好比订单已建立的消息发送到用户积分子系统中用于增长用户积分,可是积分消费法处理却都失败了,用户就会问:我购买了东西为何积分并无增长呢?
要解决这个问题,须要引入消费方确认,即只有消息被成功处理以后才告知rabbitmq以ack,不然告知rabbitmq以nack,此时的处理流程以下:
在rabbitmq中,消息默认是字段ack的,即消息到达消费方当即ack,而无论消费方业务处理是否成功,为此能够开启手动确认模式,即有消费方自行决定什么时候应该ack,经过设置autoAck=false开启手动确认模式:
能够看到,在autoAck=false状况下,经过业务处理的是否成功(success())来判断应该ack仍是nack。
另外,为了不消息反复requeue的状况,若是消息第一次消费不成功,则在nack时设置requeue=true,表示告知rabbitmq将reject的消息从新投递,若是第二次消费依然不成功,那么nack时设置requeue=false,告知rabbitmq不要从新投递了,此时rabbitmq将根据本身的配置要么直接扔掉消息,要么将消息发送到DLX中,具体配置请参考“设置死信交换(DLX)和死信队列(DLQ)”。
一般来讲,程序中会先完成写数据库的操做,而后发送消息,此时一个重要的点是保证这二者的一致性,即一旦数据库保存成功消息必须也可以发送成功。要保证发送发一致性,一种作法是使用全局事务,即将数据库操做和消息发送放到一个事务中,好比JTA,可是全局事务是很重的,而且rabbitmq目前并不支持全局事务。
要解决发送发的一致性问题,能够实现将消息保存到数据库的事件表中,此时业务处理的数据库操做和保存消息到数据库属于同一个本地数据库事务,那么到此能够保证业务处理和消息产生的原子性,而后有一个异步的后台任务从数据库的事件表中一次读取未发送的消息发送至rabbitmq,发送成功后更新消息的状态为已发布。
然而,此时咱们依然没法保证发送消息和更新消息状态之间的原子性,由于可能发生消息发送成功可是数据库状态更新不成功的状况,为了解决这种极端状况,能够屡次重试消息发送,步骤以下:
不断重试,总有一个可以达到发送消息和状态更新的原子性。
那么问题也来了:rabbitmq中可能出现多条重复消息,此时消费端就懵了。为了解决这个问题,消费方应该设计为幂等的,即对相同消息的屡次消费与单次消费结果相同。有些消费方的业务逻辑自己即是幂等的,而对于自己不幂等的消费方,须要在数据库中记录已经被正确消费的消息,当重复消息来时,判断该消息是否已经被消费,若是没有则执行消费逻辑,若是已经消费则直接忽略。此时消费方的处理步骤以下:
为了保证消息的时效性,能够设置队列中消息的TTL(x-message-ttl),而为了保证消息队列不至于太大而影响性能,能够设置队列的最大消息数(x-max-length)。在建立队列时设置以下:
对于没法投递的消息,咱们须要将其记录下来便于后续跟踪排查,此时能够将这样的消息放入DLX和DLQ中。默认状况下,queue中被抛弃的消息将被直接丢掉,可是能够经过设置queue的x-dead-letter-exchange参数,将被抛弃的消息发送到x-dead-letter-exchange作指定的exchange中,这样的exchange成为DLX。
设置了x-dead-letter-exchange以后,在如下三种状况下消息将被扔到DLX中:
在声明queue时定义x-dead-letter-exchange:
须要注意的是,在发送消息时,当已经达到queue的上限,而当queue定义为x-overflow=reject-publish时,rabbitmq将nack。当有多个queue同时绑定到exchange时,若是有些queue设置了reject-publish,而有些却没有,那么依然会nack,这对发送方来讲很差处理。所以,仍是那句话,发送方只须要保证正确地投递到了exchange便可,而不用关系exchange后面有哪些queue。
Prefetch count表示消费方一次性从rabbitmq读取的消息数量,若是设置过大,那么消费方可能始终处于高负荷运转状态,而若是过小又会增长网络开销,一般设置为20-50。另外,有时为了保证多个消费方均衡地分摊消息处理任务,一般设置prefetch count为1。
在以上设置的状况下,咱们来看看当各类异常发生时,rabbitmq是如何运做的:
系统中每每会发布多种类型的消息,在发送时有几种路由策略:
笔者建议采用最后一种,而且结合DDD中的聚合划分,路由策略建议以下:
每个聚合根下发布的全部类型的事件对应一个exchange,exchange设置为topic,queue能够配置接收某一种类型的事件,也能够配置接收全部某种聚合相关的事件,还能够配置接收全部事件。