首先咱们看下消息周期投递过程:java
咱们把该图分三部分,左中右,每部分都会致使消息丢失状况,下面就详细聊聊每一个阶段消息是如何丢的:web
1) 外界环境问题致使:发生网络丢包、网络故障等形成RabbitMQ Server端收不到消息,由于生产环境的网络是很复杂的,网络抖动,丢包现象很常见,下面会讲到针对这个问题是如何解决的。编程
2) 代码层面,配置层面,考虑不全致使消息丢失bash
事例1:
通常状况下,生产者使用Confirm模式投递消息,若是方案不够严谨,好比RabbitMQ Server 接收消息失败后会发送nack消息通知生产者,生产者监听消息失败或者没作任何事情,消息存在丢失风险;服务器
事例2:
生产者发送消息到exchange后,发送的路由和queue没有绑定,消息会存在丢失状况,下面会讲到具体的例子,保证意外状况的发生,即便发生,也在可控范围内。网络
1)消息未彻底持久化,当机器重启后,消息会所有丢失,甚至Queue也不见了架构
假如:你仅仅持久化了Message,而Exchange,Queue没有持久化,这个持久化是无效的。 记得以前公司有一哥们忘记持久化Queue致使机器重启后,Queue不见了,天然Message也丢失了。并发
2)单节点模式问题,若是某个节点挂了,消息就不能用了,业务可能瘫痪,只能等待异步
若是作了消息持久化方案,消息会持久化硬盘,机器重启后消息不会丢失;可是还有一个极端状况,这台服务器磁盘忽然坏了(公司遇到过磁盘问题仍是不少的),消息持久化不了,非高可用状态,这个模式生产环境慎重考虑。高并发
3)普通集群模式:某个节点挂了,该节点上的消息不能用,有影响的业务瘫痪,只能等待节点恢复重启可用(创建在消息持久化)
虽然这个模式进步了一点点,多个节点,可是消息仍是不能保证可靠,为何呢?
由于RabbitMQ 集群模式有点特殊,队列的内容仅仅存在某一个节点上面,不会存在全部节点上面,全部节点仅仅存放消息结构和元数据(能够理解为索引,这也是为了提升性能,若是每次把全部内容同步到全部节点是有开销代价的)。 下面本身画了一张图介绍普通集群丢失消息状况:
那咱们想下,图中的Q1问题,note1挂了,这个节点的Queues所有暂时不可用,节点恢复后可用。
咱们说下图片中备注2中的问题,Producer发送消息到note2,note2在同步note1前note1挂了,此时你的心情是怎么样的。。。后面会讲具体的策略
4)镜像模式:能够解决上面的问题,可是仍是有意外状况发生
好比:持久化的消息,保存到硬盘过程当中,当前队列节点挂了,存储节点硬盘又坏了,消息丢了,怎么办?下面会详细介绍
下面也是从三个方面介绍:
1.生产者生产消息到RabbitMQ Server 可靠性保证
2.RabbitMQ Server中存储的消息如何保证
3.RabbitMQ Server到消费者消息如何不丢
这个过程,消息可能会丢,好比发生网络丢包、网络故障等形成消息丢失,通常状况下若是不采起措施,生产者没法感知消息是否已经正确无误的发送到exchange中,若是生产者能感知到的话,它能够进行进一步的处理动做,好比从新投递相关消息以确保消息的可靠性。
1.1 一般有一种方案能够解决:就是 AMQP协议提供的一个事务机制
RabbitMQ客户端中Channel 接口提供了几个事务机制相关的方法:
channel.txSelect
channel.txCommit
channel.txRollback
源码截图以下:com.rabbitmq.client 包中public interface Channel extendsShutdownNotifier {}接口
可是,不多有人这么干,由于这是同步操做,一条消息发送以后会使发送端阻塞,以等待RabbitMQ Server的回应,以后才能继续发送下一条消息,生产者生产消息的吞吐量和性能都会大大下降。
1.2 幸运的是RabbitMQ提供了一个改进方案,即发送方确认机制(publisher confirm)
首先生产者经过调用channel.confirmSelect方法将信道设置为confirm模式,一旦信道进入confirm模式,全部在该信道上面发布的消息都会被指派一个惟一的ID(从1开始),一旦消息被投递到全部匹配的队列以后,RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的惟一deliveryTag和multiple参数),这就使得生产者知晓消息已经正确到达了目的地了。
其实Confirm模式有三种方式实现:
串行confirm
for(int i = 0;i<50;i++){
channel.basicPublish(
exchange, routingKey,
mandatory, immediate,
messageProperties,
message.getContent()
);
if (channel.waitForConfirms()) {
System.out.println("发送成功");
} else {
//发送失败这里可进行消息从新投递的逻辑
System.out.println("发送失败");
}
}
复制代码
批量confirm模式
for(int i = 0;i<50;i++){
channel.basicPublish(
exchange, routingKey,
mandatory, immediate,
messageProperties,
message.getContent()
);
}
if (channel.waitForConfirms()) {
System.out.println("发送成功");
} else {
System.out.println("发送失败");
}
复制代码
上面代码是简单版本的,生产环境绝对不是循环发送的,而是根据业务状况, 各个客户端程序须要按期(每x秒)或定量(每x条)或者二者结合来publish消息,而后等待服务器端confirm。相比普通confirm模式,批量能够极大提高confirm效率。
可是有没有发现什么问题?
问题1: 批量发送的逻辑复杂化了。
问题2: 一旦出现confirm返回false或者超时的状况时,客户端须要将这一批次的消息所有重发,这会带来明显的重复消息数量,而且当消息常常丢失时,批量confirm性能应该是不升反降的。
异步confirm模式
Channel channel = channelManager.getPublisherChannel(namespaceName);
ProxiedConfirmListener confirmListener = new ProxiedConfirmListener();//监听类
confirmListener.setChannelManager(channelManager);
confirmListener.setChannel(channel);
confirmListener.setNamespace(namespaceName);
confirmListener.addSuccessCallbacks(successCallbacks);
channel.addConfirmListener(confirmListener);
channel.confirmSelect();//开启confirm模式
AMQP.BasicProperties messageProperties = null;
if (message.getProperty() instanceof AMQP.BasicProperties) {
messageProperties = (AMQP.BasicProperties) message.getProperty();
}
confirmListener.toConfirm(channel.getNextPublishSeqNo(), rawMsg);
for(int i = 0;i<50;i++){
channel.basicPublish(
exchange, routingKey,
mandatory, immediate,
messageProperties,
message.getContent()
);
}
复制代码
异步模式须要本身多写一部分复杂的代码实现,异步监听类,监听server端的通知消息,异步的好处性能会大幅度提高,发送完毕以后,能够继续发送其余消息。 MQServer通知生产端ConfirmListener监听类:用户能够继承接口实现本身的实现类,处理消息确认机制,此处继承类代码省略,就是上面 ProxiedConfirmListener 类: 下面贴下要实现的接口:
package com.rabbitmq.client;
import java.io.IOException;
/**
* Implement this interface in order to be notified of Confirm events.
* Acks represent messages handled successfully; Nacks represent
* messages lost by the broker. Note, the lost messages could still
* have been delivered to consumers, but the broker cannot guarantee
* this.
*/
public interface ConfirmListener {
/**
** handleAck RabbitMQ消息接收成功的方法,成功后业务能够作的事情
** 发送端投递消息前,须要把消息先存起来,好比用KV存储,接收到ack后删除
**/
void handleAck(long deliveryTag, boolean multiple)
throws IOException;
//handleNack RabbitMQ消息接收失败的通知方法,用户能够在这里从新投递消息
void handleNack(long deliveryTag, boolean multiple)
throws IOException;
}
复制代码
上面的接口颇有意思,若是是你的话,怎么实现? 消息投递前如何存储消息,ack 和 nack 如何处理消息?
下面看下异步confirm的消息投递流程:
解释下这张图片:
channel1 连续发类1,2,3条消息到RabbitMQ-Server,RabbitMQ-Server通知返回一条通知,里面包含回传给生产者的确认消息中的deliveryTag包含了确认消息的序号,此外还有一个参数multiple=true,表示到这个序号以前的全部消息都已经获得了处理。这样客户端和服务端通知的次数就减小类,提高类性能。
channel3 发送的消息失败了,生产端须要对投递消息从新投递,须要额外处理代码。 那么生产端须要作什么事情呢?由于是异步的,生产端须要存储消息而后根据server通知的消息,确认如何处理,因而咱们面临的问题是:
第一:发送消息以前把消息存起来
第二:监听ack 和 nack 并作响应处理
那么怎么存储呢?
咱们分析下,可使用SortedMap 存储,保证有序,可是有个问题高并发状况下, 每秒可能几千甚至上万的消息投递出去,消息的ack要等几百毫秒的话,放内存可能有内存溢出的风险。因此建议采用KV存储,KV存储承载高并发能力高,性能好,可是要保证KV 高可用,单个有个缺点就是又引入了第三方中间件,复杂度升高。
解决了上面的问题,下面还会遇到一个问题,消息丢失的另外一个状况?
事务机制和publisher confirm机制确保的是消息可以正确的发送至RabbitMQ,这里的“发送至RabbitMQ”的含义是指消息被正确的发往至RabbitMQ的交换器,若是此交换器没有匹配的队列的话,那么消息也将会丢失,怎么办?
这里有两个解决方案,
咱们看下RabbitMQ客户端代码方法
Channel 类中 发布消息方法
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
throws IOException;
复制代码
解释下:basicPublish 方法中的,mandatory和immediate
/**
* 当mandatory标志位设置为true时,若是exchange根据自身类型和消息routeKey没法找到一个符合条件的queue, 那么会调用basic.return方法将消息返回给生产者<br>
* 当mandatory设置为false时,出现上述情形broker会直接将消息扔掉。
*/
@Setter(AccessLevel.PACKAGE)
private boolean mandatory = false;
/**
* 当immediate标志位设置为true时,若是exchange在将消息路由到queue(s)时发现对于的queue上没有消费者, 那么这条消息不会放入队列中。
当immediate标志位设置为false时,exchange路由的队列没有消费者时,该消息会经过basic.return方法返还给生产者。
* RabbitMQ 3.0版本开始去掉了对于immediate参数的支持,对此RabbitMQ官方解释是:这个关键字违背了生产者和消费者之间解耦的特性,由于生产者不关心消息是否被消费者消费掉
*/
@Setter(AccessLevel.PACKAGE)
private boolean immediate;
复制代码
因此为了保证消息的可靠性,须要设置发送消息代码逻辑。若是不单独形式设置mandatory=false
使用mandatory 设置true的时候有个关键点要调整,生产者如何获取到没有被正确路由到合适队列的消息呢?经过调用channel.addReturnListener来添加ReturnListener监听器实现,只要发送的消息,没有路由到具体的队列,ReturnListener就会收到监听消息。
channel.addReturnListener(new ReturnListener() {
public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP
.BasicProperties basicProperties, byte[] body) throws IOException {
String message = new String(body);
//进入该方法表示,没路由到具体的队列
//监听到消息,能够从新投递或者其它方案来提升消息的可靠性。
System.out.println("Basic.Return返回的结果是:" + message);
}
});
复制代码
此时有人问了,不想复杂化生产者的编程逻辑,又不想消息丢失,那么怎么办? 还好RabbitMQ提供了一个叫作alternate-exchange东西,翻译下就是备份交换器,这个干什么用呢?很简单,它能够将未被路由的消息存储在另外一个exchange队列中,再在须要的时候去处理这些消息。
那如何实现呢?
简单一点能够经过webui管理后台设置,当你新建一个exchange业务的时候,能够给它设置Arguments,这个参数就是 alternate-exchange,其实alternate-exchange就是一个普通的exchange,类型最好是fanout 方便管理
当你发送消息到你本身的exchange时候,对应key没有路由到queue,就会自动转移到alternate-exchange对应的queue,起码消息不会丢失。
下面一张图看下投递过程:
那么有人有个疑问,上面介绍了,两种方式处理,发送的消息没法路由到队列的方案, 若是备份交换器和mandatory参数一块儿使用,会有什么效果?
答案是:mandatory参数无效
因为篇幅太长,我会再分一篇文章出来说下面的内容
待续...
待续...
待续...
若有收获,请帮忙转发,后续会有更好文章贡献,您的鼓励是做者最大的动力!
欢迎关注个人公众号:架构师的修炼,得到独家整理的学习资源和平常干货推送。