rabbitMq消息投递不丢失,保证幂等性

rabbitMq消息投递不丢失,保证幂等性

MQ消息投递,MQ服务器宕机致使丢失,rabbitMq经过durable参数持久化,也会几率上产生丢失。java

图片

RabbitMQ 如何保证消息不丢失?

1.丢数据场景

  • 生产端丢数据场景,例如生产者将数据推送RabbitMq时,因网络缘由致使数据丢失redis

  • rabbitmq丢数据,例如没有开启持久化,rabbitmq重启致使丢数据。或者开启持久化,在持久化到磁盘过程当中挂了。算法

  • 消费端丢数据场景,例如消费端消费过程当中挂了,rabbitmq认为消费了并删除,致使丢数据。sql

2.解决方案

2.1.消息持久化

将queue、exchange、message都持久化,但不能保证100%不丢失数据,消息持久化解决由于服务器异常奔溃致使的消息丢失数据库

queue持久化编程

//durable=true,实现queue的持久化

Connection connection = connectionFactory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("queue.persistent.name", true, false, false, null);
复制代码
//Channel类中queueDeclare的完整定义以下:
/**
 * Declare a queue
 * @see com.rabbitmq.client.AMQP.Queue.Declare
 * @see com.rabbitmq.client.AMQP.Queue.DeclareOk
 * @param queue(队列名称)the name of the queue 
 * @param durable(持久化)true if we are declaring a durable queue (the queue will survive a server restart) 
 * @param exclusive(排他队列) true if we are declaring an exclusive queue (restricted to this connection)
 * @param autoDelete(自动删除) true if we are declaring an autodelete queue (server will delete it when no longer in use)
 * @param arguments other properties (construction arguments) for the queue
 * @return a declaration-confirm method to indicate the queue was successfully declared
 * @throws java.io.IOException if an error is encountered
 */
Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                             Map<String, Object> arguments) throws IOException;
                             
exclusive:排他队列,若是一个队列被声明为排他队列,该队列仅对首次申明它的链接可见,并在链接断开时自动删除。
这里须要注意三点:
1. 排他队列是基于链接可见的,同一链接的不一样信道是能够同时访问同一链接建立的排他队列;
2.“首次”,若是一个链接已经声明了一个排他队列,其余链接是不容许创建同名的排他队列的,这个与普通队列不一样;
3.即便该队列是持久化的,一旦链接关闭或者客户端退出,该排他队列都会被自动删除的,这种队列适用于一个客户端发送读取消息的应用场景。

autoDelete:自动删除,若是该队列没有任何订阅的消费者的话,该队列会被自动删除。这种队列适用于临时队列。
复制代码

exchange持久化缓存

若是不设置exchange的持久化对消息的可靠性来讲没有什么影响,可是一样若是exchange不设置持久化,那么当broker服务重启以后,exchange将不复存在,那么既而发送方rabbitmq producer就没法正常发送消息。服务器

//durable=true,持久化
channel.exchangeDeclare(exchangeName, “direct/topic/header/fanout”, true);
复制代码
//exchangeDeclare的完整定义以下:
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange, String type, boolean durable, boolean autoDelete,
                                   Map<String, Object> arguments) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange, String type) throws IOException;
Exchange.DeclareOk exchangeDeclare(String exchange,
                                          String type,
                                          boolean durable,
                                          boolean autoDelete,
                                          boolean internal,
                                          Map<String, Object> arguments) throws IOException;
void exchangeDeclareNoWait(String exchange,
                           String type,
                           boolean durable,
                           boolean autoDelete,
                           boolean internal,
                           Map<String, Object> arguments) throws IOException;
Exchange.DeclareOk exchangeDeclarePassive(String name) throws IOException;
复制代码

message持久化markdown

queue队列持久化为true,但message没有持久化,重启后message仍是会丢失。 须要queue和message都设置持久化,broker服务重启后,队列存在,消息也存在。网络

//MessageProperties.PERSISTENT_TEXT_PLAIN 为消息持久化
channel.basicPublish("exchange.persistent", "persistent", MessageProperties.PERSISTENT_TEXT_PLAIN, "persistent_test_message".getBytes());
复制代码
//basicPublish完整定义以下:
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body)
        throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
        throws IOException;
        
exchange表示exchange的名称
routingKey表示routingKey的名称
body表明发送的消息体

//BasicProperties定义以下,deliveryMode=1表明不持久化,deliveryMode=2表明持久化
public BasicProperties(
            String contentType,//消息类型如:text/plain
            String contentEncoding,//编码
            Map<String,Object> headers,
            Integer deliveryMode,//1:nonpersistent 2:persistent
            Integer priority,//优先级
            String correlationId,
            String replyTo,//反馈队列
            String expiration,//expiration到期时间
            String messageId,
            Date timestamp,
            String type,
            String userId,
            String appId,
            String clusterId)

//MessageProperties.PERSISTENT_TEXT_PLAIN定义以下:其中deliveryMode=2表示持久化
public static final BasicProperties PERSISTENT_TEXT_PLAIN =new BasicProperties("text/plain",null,null,2,0, null, null, null,null, null, null, null,null, null);
复制代码

消息推送到rabbitmq后,先保存到cache中,而后异步刷入到磁盘中。

消息何时刷到磁盘?

写入文件前会有一个Buffer,大小为1M,数据在写入文件时,首先会写入到这个Buffer,若是Buffer已满,则会将Buffer写入到文件(未必刷到磁盘)。 有个固定的刷盘时间:25ms,也就是无论Buffer满不满,每一个25ms,Buffer里的数据及未刷新到磁盘的文件内容一定会刷到磁盘。 每次消息写入后,若是没有后续写入请求,则会直接将已写入的消息刷到磁盘:使用Erlang的receive x after 0实现,只要进程的信箱里没有消息,则产生一个timeout消息,而timeout会触发刷盘操做。

2.2.事务+confirm

在producer引入事务机制或者Confirm机制来确保消息已经正确的发送至broker端。 RabbitMQ的可靠性涉及producer端的确认机制、broker端的镜像队列的配置以及consumer端的确认机制,要想确保消息的可靠性越高,那么性能也会随之而降,鱼和熊掌不可兼得,关键在于选择和取舍。

消息持久化解决由于服务器异常奔溃致使的消息丢失,但不能解决发布者将消息发送以后,消息有没有正确到达broker代理服务器。若是在消息到达broker以前已经丢失的话,持久化操做也解决不了这个问题,由于消息根本就没到达代理服务器,你怎么进行持久化,那么这个问题该怎么解决呢?

这时RabbitMQ为咱们提供了两种方式:

  • 经过AMQP事务机制实现,这也是AMQP协议层面提供的解决方案;
  • 经过将channel设置成confirm模式来实现;
2.2.1.事务机制

rabbitMQ事务机制三个方法:

  • txSelect()用于将当前channel设置成transaction模式
  • txCommit用于提交事务
  • txRollback用于回滚事务

在经过txSelect开启事务以后,咱们即可以发布消息给broker代理服务器了,若是txCommit提交成功了,则消息必定到达了broker了,若是在txCommit执行以前broker异常崩溃或者因为其余缘由抛出异常,这个时候咱们即可以捕获异常经过txRollback回滚事务了。

try {
    channel.txSelect();
    channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());
    int result = 1 / 0;
    channel.txCommit();
} catch (Exception e) {
    e.printStackTrace();
    channel.txRollback();
}
复制代码

使用事务机制的话会下降RabbitMQ的性能,可是RabbitMQ提供了一个更好的方案,即将channel信道设置成confirm模式。

2.2.2.confirm模式

producer端confirm模式的实现原理

生产者将信道设置成confirm模式,在该信道上面发布的消息都会被指派一个惟一的ID(从1开始),一旦消息被投递到全部匹配的队列以后,broker就会发送一个确认给生产者(包含消息的惟一ID),这就使得生产者知道消息已经正确到达目的队列了,若是消息和队列是可持久化的,那么确认消息会将消息写入磁盘以后发出,broker回传给生产者的确认消息中deliver-tag域包含了确认消息的序列号,此外broker也能够设置basic.ack的multiple域,表示到这个序列号以前的全部消息都已经获得了处理。

confirm模式最大的好处在于他是异步的,一旦发布一条消息,生产者应用程序就能够在等信道返回确认的同时继续发送下一条消息,当消息最终获得确认以后,生产者应用即可以经过回调方法来处理该确认消息,若是RabbitMQ由于自身内部错误致使消息丢失,就会发送一条nack消息,生产者应用程序一样能够在回调方法中处理该nack消息。

在channel 被设置成 confirm 模式以后,全部被 publish 的后续消息都将被 confirm(即 ack) 或者被nack一次。可是没有对消息被 confirm 的快慢作任何保证,而且同一条消息不会既被 confirm又被nack 。

已经在transaction事务模式的channel是不能再设置成confirm模式的,即这两种模式是不能共存的。

编程模式

客户端实现生产者confirm有三种编程方式:

  1. 普通confirm模式:每发送一条消息后,调用waitForConfirms()方法,等待服务器端confirm。其实是一种串行confirm了。
  2. 批量confirm模式:每发送一批消息后,调用waitForConfirms()方法,等待服务器端confirm。
  3. 异步confirm模式:提供一个回调方法,服务端confirm了一条或者多条消息后Client端会回调这个方法。

第1种

\\普通confirm模式最简单,publish一条消息后,等待服务器端confirm,若是服务端返回false或者超时时间内未返回,客户端进行消息重传。

channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
if(!channel.waitForConfirms()){
	System.out.println("send message failed.");
}
复制代码

第二种

channel.confirmSelect();
for(int i=0;i<batchCount;i++){
	channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
}
if(!channel.waitForConfirms()){
	System.out.println("send message failed.");
}
复制代码

第三种

异步confirm模式的编程实现最复杂,Channel对象提供的ConfirmListener()回调方法只包含deliveryTag(当前Chanel发出的消息序号),咱们须要本身为每个Channel维护一个unconfirm的消息序号集合,每publish一条数据,集合中元素加1,每回调一次handleAck方法,unconfirm集合删掉相应的一条(multiple=false)或多条(multiple=true)记录。从程序运行效率上看,这个unconfirm集合最好采用有序集合SortedSet存储结构。实际上,SDK中的waitForConfirms()方法也是经过SortedSet维护消息序号的。

SortedSet<Long> confirmSet = Collections.synchronizedSortedSet(new TreeSet<Long>());
channel.confirmSelect();
channel.addConfirmListener(new ConfirmListener() {
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        if (multiple) {
            confirmSet.headSet(deliveryTag + 1).clear();
        } else {
            confirmSet.remove(deliveryTag);
        }
    }
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
    	System.out.println("Nack, SeqNo: " + deliveryTag + ", multiple: " + multiple);
        if (multiple) {
            confirmSet.headSet(deliveryTag + 1).clear();
        } else {
            confirmSet.remove(deliveryTag);
        }
    }
});

while (true) {
   long nextSeqNo = channel.getNextPublishSeqNo();
   channel.basicPublish(ConfirmConfig.exchangeName, ConfirmConfig.routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, ConfirmConfig.msg_10B.getBytes());
   confirmSet.add(nextSeqNo);
}
复制代码

性能对比

性能由低到高:事务模式(tx) < 普通confirm模式 < 批量confirm模式 < 异步confirm模式

2.2.3.消息确认(Consumer端)

为了保证消息从队列可靠地到达消费者,RabbitMQ提供消息确认机制(message acknowledgment)。消费者在声明队列时,能够指定noAck参数,当noAck=false时,RabbitMQ会等待消费者显式发回ack信号后才从内存(和磁盘,若是是持久化消息的话)中移去消息。不然,RabbitMQ会在队列中消息被消费后当即删除它。

采用消息确认机制后,只要令noAck=false,消费者就有足够的时间处理消息(任务),不用担忧处理消息过程当中消费者进程挂掉后消息丢失的问题,由于RabbitMQ会一直持有消息直到消费者显式调用basicAck为止。

当noAck=false时,对于RabbitMQ服务器端而言,队列中的消息分红了两部分:一部分是等待投递给消费者的消息;一部分是已经投递给消费者,可是尚未收到消费者ack信号的消息。若是服务器端一直没有收到消费者的ack信号,而且消费此消息的消费者已经断开链接,则服务器端会安排该消息从新进入队列,等待投递给下一个消费者(也可能仍是原来的那个消费者)。

RabbitMQ不会为未ack的消息设置超时时间,它判断此消息是否须要从新投递给消费者的惟一依据是消费该消息的消费者链接是否已经断开。这么设计的缘由是RabbitMQ容许消费者消费一条消息的时间能够好久好久。

RabbitMQ管理平台界面上能够看到当前队列中Ready状态和Unacknowledged状态的消息数,分别对应上文中的等待投递给消费者的消息数和已经投递给消费者可是未收到ack信号的消息数。也能够经过命令行来查看上述信息: img

代码示例(关闭自动消息确认,进行手动ack):

QueueingConsumer consumer = new QueueingConsumer(channel);
   channel.basicConsume(ConfirmConfig.queueName, false, consumer);
   
   while(true){
       QueueingConsumer.Delivery delivery = consumer.nextDelivery();
       String msg = new String(delivery.getBody());
// do something with msg. 
       channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
   }
复制代码

broker将在下面的状况中对消息进行confirm:

  • broker发现当前消息没法被路由到指定的queues中(若是设置了mandatory属性,则broker会发送basic.return)
  • 非持久属性的消息到达了其所应该到达的全部queue中(和镜像queue中)
  • 持久消息到达了其所应该到达的全部queue中(和镜像中),并被持久化到了磁盘(fsync)
  • 持久消息从其所在的全部queue中被consume了(若是必要则会被ack)

basicRecover:是路由不成功的消息可使用recovery从新发送到队列中。

basicReject:是接收端告诉服务器这个消息我拒绝接收,不处理,能够设置是否放回到队列中仍是丢掉,并且只能一次拒绝一个消息,官网中有明确说明不能批量拒绝消息,为解决批量拒绝消息才有了basicNack。

basicNack:能够一次拒绝N条消息,客户端能够设置basicNack方法的multiple参数为true,服务器会拒绝指定了delivery_tag的全部未确认的消息(tag是一个64位的long值,最大值是9223372036854775807)。

2.3.设置集群镜像模式

RabbitMQ的mirrored-queue即镜像队列,这个至关于配置了副本,当master在此特殊时间内crash掉,能够自动切换到slave,这样有效的保障了HA, 除非整个集群都挂掉,这样也不能彻底的100%保障RabbitMQ不丢消息,但比没有mirrored-queue的要好不少,不少现实生产环境下都是配置了mirrored-queue的。

2.4.消息补偿机制

消息提早持久化+定时任务

图片

上图流程:

(1)订单服务生产者在投递消息以前,先把消息持久化到Redis或DB中,建议Redis,高性能。消息的状态为发送中。

(2)confirm机制监听消息是否发送成功?如ack成功消息,删除Redis中此消息。

(3)若是nack不成功的消息,这个能够根据自身的业务选择是否重发此消息。也能够删除此消息,由本身的业务决定。

(4)这边加了个定时任务,来拉取隔必定时间了,消息状态仍是为发送中的,这个状态就代表,订单服务是没有收到ack成功消息。

(5)定时任务会做补偿性的投递消息。这个时候若是MQ回调ack成功接收了,再把Redis中此消息删除。

幂等性保证

1.惟一性索引

根据业务规则,设置表字段惟一性。

2.乐观锁方案

借鉴数据库的乐观锁机制,根据version版本,也就是在操做库存前先获取当前商品的version版本号,而后操做的时候带上此version号。咱们梳理下,咱们第一次操做库存时,获得version为1,调用库存服务version变成了2;但返回给订单服务出现了问题,订单服务又一次发起调用库存服务,当订单服务传递的version仍是1,再执行上面的sql语句时,就不会执行;由于version已经变为2了,where条件就不成立。这样就保证了无论调用几回,只会真正的处理一次。

update table set count=count-1,version=version+1 where id=2 and version=1
复制代码

3.分布式锁

如果是分布是系统,构建全局唯一索引比较困难,例如唯一性的字段没法确定,这时候可以引入分布式锁,通过第三方的系统(redis或zookeeper),在业务系统插入数据或者更新数据,获取分布式锁,然后做操作,之后释放锁,这样其实是把多线程并发的锁的思路,引入多多个系统,也就是分布式系统中得解决思路。要点:某个长流程处理过程要求不能并发执行,可以在流程执行之前根据某个标志(用户ID+后缀等)获取分布式锁,其他流程执行时获取锁就会失败,也就是同一时间该流程只能有一个能执行成功,执行完成后,释放分布式锁(分布式锁要第三方系统提供);

2.惟一ID+指纹码机制

惟一ID:如数据库的主键id

指纹码:业务规则标识惟一的。如时间戳+银行返回的惟一码。须要注意的是,这个指纹码不必定就是咱们系统生产的,多是咱们本身业务规则或者是外部返回的一些规则通过拼接后的东西。其目的:就是为了保障这次操做达到绝对惟一的。

惟一ID+指纹码机制,利用数据库主键去重。如:

Select count(id) from table where id = 惟一ID+指纹码
复制代码

好处:实现简单,就一个拼接,然后查询判断是否重复。

坏处:高并发下如果是单个数据库就会有写入性能瓶颈

解决方案:根据 ID 进行分库分表,对 id 进行算法路由,落到一个具体的数据库,然后当这个 id 第二次来又会落到这个数据库,这时候就像我单库时的查重一样了。利用算法路由把单库的幂等变成多库的幂等,分摊数据流量压力,提高性能。

3.2.Redis 原子性

相信大家都知道 redis 的原子性操作,我这里就不需要过多介绍了。性能

使用 redis 的原子性去实现需要考虑两个点

一是 是否 要进行数据落库,如果落库的话,关键解决的问题是数据库和缓存如何做到原子性? 数据库与缓存进行同步肯定要进行写操作,到底先写 redis 还是先写数据库,这是个问题,涉及到缓存更新与淘汰的问题

二是如果不落库,那么都存储到缓存中,如何设置定时同步的策略? 不入库的话,可以使用双重缓存等策略,保障一个消息副本,具体同步可以使用类似 databus 这种同步工具。

摘抄

1.honeypps.com/mq/rabbitmq…

2.mp.weixin.qq.com/s/f5DKk_alX…