可靠性投递程序员
- 确保消息发送到RabbitMQ服务器
- 确保消息被正确的路由
- 确保消息在队列正确地存储
- 确保消息从队列正确地投递到消费者
- 消费者回调
- 补偿机制
- 消息幂等性
- 消息的顺序性
首先须要明确,效率和可靠性是没法兼得的,若是要保证每个环节都成功,势必会对消息的收发效率形成影响,如过是一些业务实时性要求不是特别高的场合,能够牺牲可靠性来换取效率。面试
可能由于网络或者Broker的问题致使①失败,而生产者是没法得知消息是否正确发送到Broker的。bash
有两种解决方案:服务器
第一种是Transaction事务模式微信
第二种是Confirm确认模式网络
1.在经过channel.txSelect方法开启事务以后,咱们即可以发布消息给RabbitMQ了,若是事务提交成功,则消息必定 到达了RabbitMQ中,若是在事务提交执行以前因为RabbitMQ异常崩溃或者其余缘由抛出异常,这个时候咱们即可以将其捕获,进而经过执行channel.txRollback方法来实现事务回滚。使用事务机制的话会“吸干”RabbitMQ的性 能,通常不建议使用。 2.生产者经过调用channel.confirmSelect方法(即Confirm.Select命令)将信道设置为confirm模式。一旦消息被投递到全部匹配的队列以后,RabbitMQ就会发送一个确认(Basic.Ack)给生产者(包含消息的惟一ID),这就使得生产者知晓消息已经正确到达了目的地了。架构
可能由于路由关键字错误,或者队列不存在,或者队列名称错误致使②失败。ui
Map<String,Object> arguments = new HashMap<String,Object>();
// 指定交换机的备份交换机
arguments.put("alternate-exchange","ALTERNATE_EXCHANGE");
channel.exchangeDeclare("TEST_EXCHANGE","topic", false, false, false, arguments);
复制代码复制代码
可能由于系统宕机、重启、关闭等等状况致使存储在队列的消息丢失,即③出现问题。spa
解决方案:日志
1.队列持久化
// String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
复制代码复制代码
2.交换机持久化
// String exchange, boolean durable
channel.exchangeDeclare("MY_EXCHANGE","true");
复制代码复制代码
3.消息持久化
AMQP.BasicProperties properties = new AMQP.BasicProperties
.Builder()
// 2表明持久化,其余表明瞬态
.deliveryMode(2)
.build();
channel.basicPublish("", QUEUE_NAME, properties, msg.getBytes());
复制代码复制代码
若是消费者收到消息后将来得及处理即发生异常,或者处理过程当中发生异常,会致使④失败。 为了保证消息从队列可靠性到达消费者,RabbitMQ提供了消息确认机制(message acknowledgement),消费者在订阅队列时,能够指定autoAck参数,当autoAck等于false时,RabbitMQ会等待消费者显示地回复确认消息才从队列中删除该消息。 若是消息消费失败,也能够调用Basic.Reject或者BasicNack来拒绝当前消息而不是确认,若是requere参数为true,能够把这条消息从新存入队列,以便发送给下一个消费者。
消费者处理消息以后,能够再发送一条消息给生产者,或者调用生产者地API,告知消息处理完毕。
对于必定时间没有响应地消息,能够设置一个定时重发地机制,可是要控制次数,好比最多重复三次,不然会形成消息堆积。
服务端是没有这种控制的,只能在消费端控制。
如何避免消息的重复消费?
消息重复消费可能会有两个缘由:
对于重复发送的消息,能够对每一条消息生成一个惟一的业务id,经过日志或者建表来作重复控制。
消息的顺序性是指消费者消费消息的顺序跟生产者投递消息的顺序是一致的。
在RabbitMQ中,一个队列有多个消费者时,因为不一样的消费者消费消息的速度是不同的,顺序没法保证
欢迎你们关注和点赞,之后会不断更新更多精选干货文章分享!
读者福利
部分资料以下: