在以前异常处理部分就已经写了,对于consumer的异常退出致使消息丢失,能够时候consumer的消息确认机制。重复的就不说了,这里说一些不同的。性能
当一个消费者收到一个快递,可是这个包裹是破损的,这时候通常会有如下选择测试
拒收快递,让快递员把快递寄回。 (若是有多个consumer可能这条消息会到其它的consumer中,若是只有一个,那么下次获取仍是能够拿到)spa
签收快递,而后偷偷的扔了(钱多任性)3d
拒收快递,联系商家再给我补发一个code
下面是具体的方法,BasicReject同时承担了扔掉消息与退回。区别在第二个参数。BasicNack则是批量进行上面两个操做,DeliveryTag小于或等于当前消息的都会进行该操做,固然是否批量是由第二个参数来决定的blog
//扔掉消息 channel.BasicReject(result.DeliveryTag, false); //退回消息 channel.BasicReject(result.DeliveryTag, true); //批量退回或删除,中间的参数 是否批量 true是/false否 (也就是只一条) channel.BasicNack(result.DeliveryTag, true, true);
BasicRecover方法则是进行补发操做,其中的参数若是为true是把消息退回到queue可是有可能被其它的consumer接收到,设置为false是只补发给当前的consumerrabbitmq
//补发消息 true退回到queue中/false只补发给当前的consumer channel.BasicRecover(true);
消息不仅是在consumer处理的时候出问题,在发布的时候也可能会出问题。不是说把消息向方法里一丢就必定会成功的。因此发布的确认机制也是为高可靠性保驾护航。事务
rabbitmq为咱们提供了两种方式get
确认方式 (confirm)string
事务控制方式 (tx)
可是要知道这些都是耗费性能的,其中事务的性能消耗最大,confirm其次
第一种confirm方式,发布后等待rabbitmq返回消息发布状态
//建立返回一个新的频道 using (var channel = RabbitMqHelper.GetConnection().CreateModel()) { for (var i = 0; i < 6; i++) { channel.BasicPublish(string.Empty, "testqueue", null, Encoding.UTF8.GetBytes($"这是{i}个消息")); } //等待发布成功并返回发布状态 bool isok = channel.WaitForConfirms(); Console.ReadKey(); }
第二种事务控制方式
//建立返回一个新的频道 using (var channel = RabbitMqHelper.GetConnection().CreateModel()) { try { //锁往 channel.TxSelect(); for (var i = 0; i < 6; i++) { channel.BasicPublish(string.Empty, "testqueue", null, Encoding.UTF8.GetBytes($"这是{i}个消息")); } //提交 channel.TxCommit(); Console.ReadKey(); } catch (Exception e) { //回退 channel.TxRollback(); } }
下面是针对速度测试图,发布一万条消息。 第一次我使用了事务控制方式。耗时915毫秒
第二次使用了confirm方式,耗时374毫秒
第三次没有使用消息确认机制,耗时317毫秒