消息中间件RabbitMQ可靠性投递与生产实践

本章重点:

可靠性投递程序员

  1. 确保消息发送到RabbitMQ服务器
  2. 确保消息被正确的路由
  3. 确保消息在队列正确地存储
  4. 确保消息从队列正确地投递到消费者
  5. 消费者回调
  6. 补偿机制
  7. 消息幂等性
  8. 消息的顺序性

可靠性投递

首先须要明确,效率和可靠性是没法兼得的,若是要保证每个环节都成功,势必会对消息的收发效率形成影响,如过是一些业务实时性要求不是特别高的场合,能够牺牲可靠性来换取效率。面试

  1. ①表明消息从生产者发送到Exchange
  2. ②表明消息从Exchange路由到Queue
  3. ③ 表明消息在Queue中存储;
  4. ④ 表明消费者订阅Queue并消费消息。

1.确保消息发送到RabbitMQ服务器

可能由于网络或者Broker的问题致使①失败,而生产者是没法得知消息是否正确发送到Broker的。bash

有两种解决方案:服务器

第一种是Transaction事务模式微信

第二种是Confirm确认模式网络

1.在经过channel.txSelect方法开启事务以后,咱们即可以发布消息给RabbitMQ了,若是事务提交成功,则消息必定 到达了RabbitMQ中,若是在事务提交执行以前因为RabbitMQ异常崩溃或者其余缘由抛出异常,这个时候咱们即可以将其捕获,进而经过执行channel.txRollback方法来实现事务回滚。使用事务机制的话会“吸干”RabbitMQ的性 能,通常不建议使用。 2.生产者经过调用channel.confirmSelect方法(即Confirm.Select命令)将信道设置为confirm模式。一旦消息被投递到全部匹配的队列以后,RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的惟一ID),这就使得生产者知晓消息已经正确到达了目的地了。架构

2.确保消息被正确的路由

可能由于路由关键字错误,或者队列不存在,或者队列名称错误致使②失败。ui

  1. 使用mandatory参数和ReturnListener,能够实现消息没法路由的时候返回给生产者。
  2. 另外一种方式就是使用备份交换机(alternate-exchange),没法路由的消息会发送到这个交换机上。
Map<String,Object> arguments = new HashMap<String,Object>();
// 指定交换机的备份交换机
arguments.put("alternate-exchange","ALTERNATE_EXCHANGE"); 
channel.exchangeDeclare("TEST_EXCHANGE","topic", false, false, false, arguments);
复制代码复制代码

3.确保消息在队列正确地存储

可能由于系统宕机、重启、关闭等等状况致使存储在队列的消息丢失,即③出现问题。spa

解决方案:日志

1.队列持久化

// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments 
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
复制代码复制代码

2.交换机持久化

// String exchange, boolean durable 
channel.exchangeDeclare("MY_EXCHANGE","true");
复制代码复制代码

3.消息持久化

AMQP.BasicProperties properties = new AMQP.BasicProperties
.Builder() 
// 2表明持久化,其余表明瞬态 
.deliveryMode(2) 
.build(); 
channel.basicPublish("", QUEUE_NAME, properties, msg.getBytes());
复制代码复制代码

4.确保消息从队列正确地投递到消费者

若是消费者收到消息后将来得及处理即发生异常,或者处理过程当中发生异常,会致使④失败。 为了保证消息从队列可靠性到达消费者,RabbitMQ提供了消息确认机制(message acknowledgement),消费者在订阅队列时,能够指定autoAck参数,当autoAck等于false时,RabbitMQ会等待消费者显示地回复确认消息才从队列中删除该消息。 若是消息消费失败,也能够调用Basic.Reject或者BasicNack来拒绝当前消息而不是确认,若是requere参数为true,能够把这条消息从新存入队列,以便发送给下一个消费者。

5.消费者回调

消费者处理消息以后,能够再发送一条消息给生产者,或者调用生产者地API,告知消息处理完毕。

6.补偿机制

对于必定时间没有响应地消息,能够设置一个定时重发地机制,可是要控制次数,好比最多重复三次,不然会形成消息堆积。

7.消息幂等性

服务端是没有这种控制的,只能在消费端控制。

如何避免消息的重复消费?

消息重复消费可能会有两个缘由:

  1. 生产者的问题。环节①重复发送消息,好比在开启Confirm模式但未收到确认
  2. 环节④出了问题,因为消费者未发送ACK或者其它缘由,消息重复投递

对于重复发送的消息,能够对每一条消息生成一个惟一的业务id,经过日志或者建表来作重复控制。

8.消息的顺序性

消息的顺序性是指消费者消费消息的顺序跟生产者投递消息的顺序是一致的。

在RabbitMQ中,一个队列有多个消费者时,因为不一样的消费者消费消息的速度是不同的,顺序没法保证

最后

欢迎你们关注和点赞,之后会不断更新更多精选干货文章分享!

读者福利

在这给你们推荐一个微信公众号,那里天天都会有技术干货、技术动向、职业生涯、行业热点、职场趣事等一切有关于程序员的内容分享。更有海量Java架构、移动互联网架构相关源码视频,面试资料,电子书籍免费发放。我看了以为资源还不错,若是大家有须要的话,扫描下方二维码关注wx公众号免费获取↓↓↓


部分资料以下:



相关文章
相关标签/搜索