本文收录在我的博客: www.chengxy-nds.top,技术资源共享,一块儿进步
最近部门号召大伙多组织一些技术分享会,说是要活跃公司的技术氛围,但早就看穿一切的我知道,这 T M 就是为了刷KPI
。不过,话说回来这的确是件好事,与其开那些没味的扯皮会,多作技术交流仍是颇有助于我的成长的。javascript
因而乎我主动报名参加了分享,咳咳咳~ ,真的不是为了那点KPI
,就是想和大伙一块儿学习学习!java
此次我分享的是 springboot
+ rabbitmq
如何实现消息确认机制,以及在实际开发中的一点踩坑经验,其实总体的内容比较简单,有时候事情就是这么神奇,越是简单的东西就越容易出错。git
能够看到使用了 RabbitMQ
之后,咱们的业务链路明显变长了,虽然作到了系统间的解耦,但可能形成消息丢失的场景也增长了。例如:程序员
因此说能不使用中间件就尽可能不要用,若是为了用而用只会徒增烦恼。开启消息确认机制之后,尽管很大程度上保证了消息的准确送达,但因为频繁的确认交互,rabbitmq
总体效率变低,吞吐量降低严重,不是很是重要的消息真心不建议你用消息确认机制。github
下边咱们先来实现springboot
+ rabbitmq
消息确认机制,再对遇到的问题作具体分析。redis
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
配置中须要开启 发送端
和 消费端
的消息确认。spring
spring.rabbitmq.host=127.0.0.1 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest # 发送者开启 confirm 确认机制 spring.rabbitmq.publisher-confirms=true # 发送者开启 return 确认机制 spring.rabbitmq.publisher-returns=true #################################################### # 设置消费端手动 ack spring.rabbitmq.listener.simple.acknowledge-mode=manual # 是否支持重试 spring.rabbitmq.listener.simple.retry.enabled=true
定义交换机 confirmTestExchange
和队列 confirm_test_queue
,并将队列绑定在交换机上。springboot
@Configuration public class QueueConfig { @Bean(name = "confirmTestQueue") public Queue confirmTestQueue() { return new Queue("confirm_test_queue", true, false, false); } @Bean(name = "confirmTestExchange") public FanoutExchange confirmTestExchange() { return new FanoutExchange("confirmTestExchange"); } @Bean public Binding confirmTestFanoutExchangeAndQueue( @Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange, @Qualifier("confirmTestQueue") Queue confirmTestQueue) { return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange); } }
rabbitmq
的消息确认分为两部分:发送消息确认 和 消息接收确认。
发送消息确认:用来确认生产者 producer
将消息发送到 broker
,broker
上的交换机 exchange
再投递给队列 queue
的过程当中,消息是否成功投递。服务器
消息从 producer
到 rabbitmq broker
有一个 confirmCallback
确认模式。app
消息从 exchange
到 queue
投递失败有一个 returnCallback
退回模式。
咱们能够利用这两个Callback
来确保消的100%送达。
消息只要被 rabbitmq broker
接收到就会触发 confirmCallback
回调 。
@Slf4j @Component public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback { @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (!ack) { log.error("消息发送异常!"); } else { log.info("发送者爸爸已经收到确认,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause); } } }
实现接口 ConfirmCallback
,重写其confirm()
方法,方法内有三个参数correlationData
、ack
、cause
。
correlationData
:对象内部只有一个 id
属性,用来表示当前消息的惟一性。ack
:消息投递到broker
的状态,true
表示成功。cause
:表示投递失败的缘由。但消息被 broker
接收到只能表示已经到达 MQ服务器,并不能保证消息必定会被投递到目标 queue
里。因此接下来须要用到 returnCallback
。
若是消息未能投递到目标 queue
里将触发回调 returnCallback
,一旦向 queue
投递消息未成功,这里通常会记录下当前消息的详细投递数据,方便后续作重发或者补偿等操做。
@Slf4j @Component public class ReturnCallbackService implements RabbitTemplate.ReturnCallback { @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey); } }
实现接口ReturnCallback
,重写 returnedMessage()
方法,方法有五个参数message
(消息体)、replyCode
(响应code)、replyText
(响应内容)、exchange
(交换机)、routingKey
(队列)。
下边是具体的消息发送,在rabbitTemplate
中设置 Confirm
和 Return
回调,咱们经过setDeliveryMode()
对消息作持久化处理,为了后续测试建立一个 CorrelationData
对象,添加一个id
为10000000000
。
@Autowired private RabbitTemplate rabbitTemplate; @Autowired private ConfirmCallbackService confirmCallbackService; @Autowired private ReturnCallbackService returnCallbackService; public void sendMessage(String exchange, String routingKey, Object msg) { /** * 确保消息发送失败后能够从新返回到队列中 * 注意:yml须要配置 publisher-returns: true */ rabbitTemplate.setMandatory(true); /** * 消费者确认收到消息后,手动ack回执回调处理 */ rabbitTemplate.setConfirmCallback(confirmCallbackService); /** * 消息投递到队列失败回调处理 */ rabbitTemplate.setReturnCallback(returnCallbackService); /** * 发送消息 */ rabbitTemplate.convertAndSend(exchange, routingKey, msg, message -> { message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return message; }, new CorrelationData(UUID.randomUUID().toString())); }
消息接收确认要比消息发送确认简单一点,由于只有一个消息回执(ack
)的过程。使用@RabbitHandler
注解标注的方法要增长 channel
(信道)、message
两个参数。
@Slf4j @Component @RabbitListener(queues = "confirm_test_queue") public class ReceiverMessage1 { @RabbitHandler public void processHandler(String msg, Channel channel, Message message) throws IOException { try { log.info("小富收到消息:{}", msg); //TODO 具体业务 channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { if (message.getMessageProperties().getRedelivered()) { log.error("消息已重复处理失败,拒绝再次接收..."); channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒绝消息 } else { log.error("消息即将再次返回队列处理..."); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } } } }
消费消息有三种回执方法,咱们来分析一下每种方法的含义。
basicAck
:表示成功确认,使用此回执方法后,消息会被rabbitmq broker
删除。
void basicAck(long deliveryTag, boolean multiple)
deliveryTag
:表示消息投递序号,每次消费消息或者消息从新投递后,deliveryTag
都会增长。手动消息确认模式下,咱们能够对指定deliveryTag
的消息进行ack
、nack
、reject
等操做。
multiple
:是否批量确认,值为 true
则会一次性 ack
全部小于当前消息 deliveryTag
的消息。
举个栗子: 假设我先发送三条消息deliveryTag
分别是五、六、7,可它们都没有被确认,当我发第四条消息此时deliveryTag
为8,multiple
设置为 true,会将五、六、七、8的消息所有进行确认。
basicNack
:表示失败确认,通常在消费消息业务异常时用到此方法,能够将消息从新投递入队列。
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
deliveryTag
:表示消息投递序号。
multiple
:是否批量确认。
requeue
:值为 true
消息将从新入队列。
basicReject
:拒绝消息,与basicNack
区别在于不能进行批量操做,其余用法很类似。
void basicReject(long deliveryTag, boolean requeue)
deliveryTag
:表示消息投递序号。
requeue
:值为 true
消息将从新入队列。
发送消息测试一下消息确认机制是否生效,从执行结果上看发送者发消息后成功回调,消费端成功的消费了消息。
用抓包工具Wireshark
观察一下rabbitmq
amqp协议交互的变化,也多了 ack
的过程。
这是一个很是没技术含量的坑,但倒是很是容易犯错的地方。
开启消息确认机制,消费消息别忘了channel.basicAck
,不然消息会一直存在,致使重复消费。
在我最开始接触消息确认机制的时候,消费端代码就像下边这样写的,思路很简单:处理完业务逻辑后确认消息, int a = 1 / 0
发生异常后将消息从新投入队列。
@RabbitHandler public void processHandler(String msg, Channel channel, Message message) throws IOException { try { log.info("消费者 2 号收到:{}", msg); int a = 1 / 0; channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); } catch (Exception e) { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); } }
可是有个问题是,业务代码一旦出现 bug
99.9%的状况是不会自动修复,一条消息会被无限投递进队列,消费端无限执行,致使了死循环。
本地的CPU
被瞬间打满了,你们能够想象一下当时在生产环境致使服务死机,我是有多慌。
并且rabbitmq management
只有一条未被确认的消息。
通过测试分析发现,当消息从新投递到消息队列时,这条消息不会回到队列尾部,还是在队列头部。
消费者会马上消费这条消息,业务处理再抛出异常,消息再从新入队,如此反复进行。致使消息队列处理出现阻塞,致使正常消息也没法运行。
而咱们当时的解决方案是,先将消息进行应答,此时消息队列会删除该条消息,同时咱们再次发送该消息到消息队列,异常消息就放在了消息队列尾部,这样既保证消息不会丢失,又保证了正常业务的进行。
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 从新发送消息到队尾 channel.basicPublish(message.getMessageProperties().getReceivedExchange(), message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN, JSON.toJSONBytes(msg));
但这种方法并无解决根本问题,错误消息仍是会时不时报错,后面优化设置了消息重试次数,达到了重试上限之后,手动确认,队列删除此消息,并将消息持久化入MySQL
并推送报警,进行人工处理和定时任务作补偿。
如何保证 MQ 的消费是幂等性,这个须要根据具体业务而定,能够借助MySQL
、或者redis
将消息持久化,经过再消息中的惟一性属性校验。
demo
的GitHub
地址 https://github.com/chengxy-nd...
原创不易,燃烧秀发输出内容,若是有一丢丢收获,点个赞鼓励一下吧!
习惯在VX看技术文章,想要获取更多Java资源的同窗,能够关注个人公众号:程序员内点事,暗号:[666]