Rabbitmq的可靠消息投递

1、背景

生产端向rabbitmq发送消息时,因为网络等缘由可能致使消息发送失败。因此,rabbitmq必须有机制确保消息能准确到达mq,若是不能到达,必须反馈给生产端进行重发。redis

RabbitMQ消息的可靠性投递主要两种实现:
一、经过实现消费的重试机制,经过@Retryable来实现重试,能够设置重试次数和重试频率;
二、生产端实现消息可靠性投递。spring

两种方法消费端均可能收到重复消息,要求消费端必须实现幂等性消费。数据库

2、消息投递到exchange的确认模式

rabbitmq的消息投递的过程为:
producer ——> rabbitmq broker cluster ——> exchange ——> queue ——> consumer缓存

一、生产端发送消息到rabbitmq broker cluster后,异步接受从rabbitmq返回的ack确认信息。安全

二、生产端收到返回的ack确认消息后,根据ack是true仍是false,调用confirmCallback接口进行处理。网络

 

在application.yml中开启生产端confirm模式并发

spring: rabbitmq: publisher-confirms: true

 

实现ConfirmCallback接口中的confirm方法,ack为true表示消息发送成功,ack为false表示消息发送失败app

@Component @Slf4j public class RabbitTemplateConfig implements ConfirmCallback{ @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this);   // 指定 ConfirmCallback
 } @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { //try to resend msg
       } else { //delete msg in db
 } } }

注意:在confirmCallback回调接口中是没有消息数据的,因此即便消息发送失败,生产端也没法在这个回调接口中直接重发,confirmCallback只能起到一个通知的做用。异步

3、消息投递失败的重发机制

若是rabbitmq返回ack失败,生产端也没法确认消息是否真的发送成功,也会形成数据丢失。最好的办法是使用rabbitmq的事务机制,可是rabbitmq的事务机制效率极低,每秒钟处理的消息仅几百条,不适合并发量大的场景。 ide

 

另一种实现思路:

一、生产端保存每次发送的消息,若是发送成功就删除消息;

二、若是发送失败就取出消息从新发送;

三、若是超时尚未收到mq返回的ack,一样取出消息从新发送。

这样就能够避免消息丢失的风险。

 

以使用redis保存消息msg为例,具体实现方案为:

一、生产端在发送消息以前,生成ack惟一确认的id;

二、以ackId为键,消息为value,保存进redis缓存,设置超时时间;

三、redis实现超时触发接口,当key过时时,重发消息并再次执行第2步;

四、生产端实现ConfirmCallback接口;

五、ConfirmCallback接口触发时,若ack为true,则直接删除这次ackId对应的msg;若ack为false,则将该ackId对应的msg取出重发;

 

网上另外的实现方案:

不经过设置redis超时时间触发超时事件进行重发,而是取出消息放入一个ackFailList中,而后开启定时任务,扫描ackFailList,重发失败的msg。

网上的这套方案思路上和上一个方案差很少,可是是采用的额外的List来保存发送失败的消息,因为List保存在内存中,不具有持久化的功能,因此这样并不安全,若是生产端程序异常退出将致使消息丢失。能够考虑保存到数据库中。

4、消息未投递到queue的退回模式

生产端经过实现ReturnCallback接口,启动消息失败返回,消息路由不到队列时会触发该回调接口。

 

在application.yml中开启return模式

spring: rabbitmq: publisher-returns: true

 

实现ReturnCallback接口,能够获取消息主体内容,实现消息重发

@Component @Slf4j public class RabbitTemplateConfig implements ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; @PostConstruct public void init() { rabbitTemplate.setReturnCallback(this);   //指定 ReturnCallback
 } @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("消息主体 message : {}", message); log.info("消息主体 message : {}", replyCode); log.info("描述:{}", replyText); log.info("消息使用的交换器 exchange : {}", exchange); log.info("消息使用的路由键 routing : {}", routingKey); } }
相关文章
相关标签/搜索