做者:threedayman 恒生LIGHT云社区html
接着上一讲 消息中间件之RabbitMQ初识,这笔咱们来说讲RabbitMQ中消息丢失的问题。已经怎样在核心业务中避免消息丢失。java
血泪故事:商品购物流程中的发货环节引入了RabbitMQ,某天因为网络抖动致使了生产者的消息没有发送到RabbitMQ中,因为没有作消息的可靠性传输保证,消息丢失,致使一批客户迟迟没收到货物而引起投诉,给公司形成了不小的损失。网络
为了不上述悲剧重演,咱们来了解下在RabbitMQ中咱们须要怎样保证消息不丢失。异步
消息丢失会发生在何时
消息的传输过程大体以下图ide
消息丢失可能发生在优化
- Producer端 发送到RabbitMQ中因为网络异常或者服务异常致使消息发送失败。
- RabbitMQ服务端 异常或者重启致使消息丢失。
- Consumer端 接收到消息后,消息处理失败,消息丢失。
固然上一讲中有提到在RabbitMQ,生产者发送消息是和Exchange交互,Exchange根据路由规则投递到具体的Queue中,若是路由规则设置有问题,也会致使消息丢失,但此条不在本文讨论重点。url
Producer 消息可靠性保证
为了不因为网络抖动或者RabbitMQ服务端异常致使消息发送失败的问题。能够在Producer发送消息的使用引入了一个确认机制(ack),服务端接收到消息以后,会返回给Producer一个成功或者失败的确认消息。.net
RabbitMQ提供了两种解决方式:3d
- 事务机制
- 发送方确认机制
事务方式,主要方法有如下几个code
- channel.txSelect() 将当前的channel设置成事务模式。
- channel.txCommit()用于提交事务。
- channel.txRollback()用于事务回滚
下面代码是简单示例
try { channel.txSelect(); channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); channel.txCommit(); } catch (Exception e) { e.printStackTrace(); channel.txRollback(); //发送失败后续处理,重发或者持久化异常消息稍后重试 }
信号的流转过程以下图
图片来源 RabbitMQ实战指南
若是事务可以提交成功,则消息必定到达了RabbitMQ中。
图片来源 RabbitMQ实战指南
事务机制可以解决消息生产者和RabbitMQ之间消息 确认的问题,只有消息成功被RabbitMQ接收,事务才能提交成功。但事务机制是同步阻塞进行的,回大大下降RabbitMQ的吞吐量,RabbitMQ提供了一种改进方案,即发送方确认机制。
发送方确认机制:
- channel.confirmSelect(); 将通道设置确认机制
- channel.addConfirmListener() 为通道添加ConfirmListener这个回调接口。
- com.rabbitmq.client.ConfirmListener#handleAck 回调处理正常被RabbitMQ接收的消息。
- com.rabbitmq.client.ConfirmListener#handleNack回调处理没有被RabbitMQ正常接收的消息。
SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>()); channel.confirmSelect(); channel.addConfirmListener(new ConfirmListener() { public void handleAck(long deliveryTag, boolean multiple) throws IOException { if (multiple) { confirmSet.headSet(deliveryTag + 1).clear(); } else { confirmSet.remove(deliveryTag); } } public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple); if (multiple) { confirmSet.headSet(deliveryTag + 1).clear(); } else { confirmSet.remove(deliveryTag); } //这里须要添加消息发送失败处理的代码,从新发送或者持久化后补偿。 } }); //模拟一直发送消息的场景 while (true) { long nextSeqNo = channel.getNextPublishSeqNo(); channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes()); confirmSet.add(nextSeqNo); }
上面例子演示了异步confirm的形式,在保证生产者消息被RabbitMQ正常接收,又没有同步阻塞致使明显下降RabbitMQ吞吐量的问题。
RabbitMQ端
为避免RabbitMQ服务异常或者重启致使的消息丢失,须要对作持久化操做,将相关信息保存到磁盘上。要保证消息不丢失须要持久化主队列、持久化。exchange不持久化,在RabbitMQ服务重启后,相关的exchange元数据会丢失,不过消息不丢失,但消息不能发送到这个exchange中了。
- 队列持久化须要在声明队列的时候将durable参数设置为true。(由于消息是存在与队列中,若是队列不持久化,那RabbitMQ重启后,消息将丢失)
- 消息持久化经过将投递模式设置成2(BasicProperties中的deliveryMode)。
channel.queueDeclare(QUEUE_NAME,true,//durable false,false,null); channel.basicPublish("",QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN,//具体属性见下面 message.getBytes(StandardCharsets.UTF_8));
public static final BasicProperties PERSISTENT_TEXT_PLAIN = new BasicProperties("text/plain", null, null, 2, //deliveryMode 0, null, null, null, null, null, null, null, null, null);
Consumer端
为保证Consumer端不因消费处理异常或消费者应用重启致使消息丢失。咱们须要以下操做
- 关闭默认的自动确认。设置为手动确认模式。
手动确认模式:RabbitMQ会等待消费者回复确认信号后才从删除消息。
自动确认模式(默认):RabbitMQ会自动把发出去的消息置为确认,而后删除,无论消费者有没有真正消费到这些消息。
当设置为手动确认模式,对于RabbitMQ服务端而言队列中的消息分为了两种
- Ready:等待投递给消费者的消息。
- Unacked:已经投递给消费者,但尚未收到消费者确认新号的消息。
对于Unacked消息,会出现下面几种状况:
- RabbitMQ收到持有消息的消费者的ack信号,RabbitMQ服务端将会删除该消息。
- RabbitMQ服务端收到持有消息的消费者nack/reject信号,requeue参数为true,RabbitMQ会从新将这条消息存入队列。
- RabbitMQ服务端收到持有消息的消费者nack/reject信号,requeue参数为false,若是队列配置了死信队列,则消息进入死信队列,若是没有配置死信队列,则消息被RabbitMQ从队列中删除。
- RabbitMQ服务端没有收到消息持有消费者的确认信号,且消费此消息的消费者没有断开链接,则服务端会一直等待,没有超时时间。
- RabbitMQ服务端没有收到消息持有消费者的确认信号,且消费此消息的消费者已经断开链接,RabbitMQ会安排该消息从新进入队列。
消息拒绝可使用Channel类中的basicReject或者basicNack方法,下面咱们来看下他们之间的差别。
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException; void basicReject(long deliveryTag, boolean requeue) throws IOException;
- deliveryTag:64位的长整型值,做为消息的编号。
- requeue:是否重入队列配置项。
- multiple:是否批量处理未被当前消费者确认的消息。
basicReject一次只能拒绝一条消息。
basicNack当multiple为false时一次拒绝一条编号为deliveryTag消息,效果和basicReject同样。当multiple为true时表示拒绝deliveryTag编号以前全部未被当前消费者确认的消息。
咱们来看一个代码示例:
boolean autoAck = false; channel.basicConsume(queueName, autoAck, "a-consumer-tag", new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { long deliveryTag = envelope.getDeliveryTag(); try{ //消息处理业务逻辑处理 channel.basicAck(deliveryTag, false); }catch(Exception e){ //处理失败处理逻辑 channel.basicReject(deliveryTag, false); } } });
经过手动确认模式,RabbitMQ只有在收到持有消息的Consumer的应答信号时,才会删除掉消息,保证消息不因Consumer应用异常而致使消息丢失的问题发生。
看了消费端保证消息不丢失的方案,有小伙伴会有疑问,假如RabbitMQ已经把消息投递给了Consumer,Consumer正常的处理了消息,可是因为网络抖动等缘由,RabbitMQ没有收到Consumer的ack消息,且认为Consumer已经断开链接,那么RabbitMQ会从新将消息放入队列,并投递给消费者。这样会致使某些消息重复投递给Consumer的问题产生。
在此种方案下RabbitMQ确实有可能产生重复消息的问题,咱们将在接下来的文章中去处理这个问题。
该方案只保证消息至少一次投递(At least Once)
死信队列
DLX,全名Dead-Letter-Exchange,死信交换器。当一个消息变为死信(dead message)后,可以被从新DLX上,绑定DLX的队列就是死信队列。
消息变成私信有如下几种可能
- 消息被拒绝(basicNack/basicReject),而且设置requeue参数为false;
- 消息过时。
- 队列超过最大长度。
下面经过一个简化的代码示例来演示下死信队列的使用。详细说明见注释
//声明交换器 channe1.exchangeDeclare("exchange.dlx","direct ",true); channe1.exchangeDeclare( "exchange.normal "," fanout ",true); Map<String , Object> args = new HashMap<String, Object>( ); //设置消息超时时间 args.put("x-message-ttl " , 10000); //经过x-dead-letter-exchange参数来执行DLX args.put( "x-dead-letter-exchange ","exchange.dlx"); //为DLX指定路由键 args.put( "x-dead-letter-routing-key"," routingkey"); channe1.queueDec1are( "queue.norma1 ",true,fa1se,fa1se,args); channe1.queueBind( "queue.normal ","exchange .normal", ""); channe1.queueDec1are( "queue.d1x ", true , false , false , null) ; channe1.queueBind( "queue.dlx","exchange.dlx ", routingkey"); channe1.basicPublish( "exchange.normal" , "rk" , MessageProperties.PERSISTENT_TEXT_PLAIN,"dlx".getBytes()) ;
消息流程见下图
对于RabbitMQ来讲,经过分析死信队列中的消息,能够用于改善和优化系统。
总结:消息丢失可能发生在生产端、服务端、消费端。对于重要业务咱们能够经过上面介绍的方式来确保消息不丢失。你们也能够留言讨论下,在使用RabbitMQ过程当中遇到过哪些坑。
参考文档