针对问题(1),咱们能够经过生产者的确认消息机制来解决,主要分为两种:第一是事务机制、第二是发送方确认机制缓存
一、事务机制服务器
与事务机制相关的有三种方法,分别是channel.txSelect设置当前信道为事务模式、channel.txCommit提交事务和channel.txRollback事务回滚。若是事务提交成功,则消息必定是到达了RabbitMQ中,若是事务提交以前因为发送异常或者其余缘由,捕获后能够进行channel.txRollback回滚。数据结构
// 将信道设置为事务模式,开启事务 channel.txSelect(); // 发送持久化消息 channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, MessageProperties.PERSISTENT_TEXT_PLAIN, "transaction messages".getBytes()); // 事务提交 channel.txCommit();
发生异常以后事务回滚异步
若是想学习Java工程化、高性能及分布式、深刻浅出。微服务、Spring,MyBatis,Netty源码分析的朋友能够加个人Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给你们。分布式
try { channel.txSelect(); channel.basicPublish(exchange, routingKey, MessageProperties.PERSISTENT_TEXT_PLAIN, "transaction messages".getBytes()); channel.txCommit(); } catch (Exception e){ e.printStackTrace(); channel.txRollback(); }
二、确认机制ide
生产者将信道设置为confirm确认模式,确认以后全部在信道上的消息将会被指派一个惟一的从1开始的ID,一旦消息被正确匹配到全部队列后,RabbitMQ就会发送一个确认Basic.Ack给生产者(包含消息的惟一ID),生产者便知晓消息是否正确到达目的地了。微服务
消息若是是持久化的,那么确认消息会在消息写入磁盘以后发出。RabbitMQ中的deliveryTag包含了确认消息序号,还能够设置multiple参数,表示到这个序号以前的全部消息都已经获得处理。确认机制相对事务机制来讲,相比较代码来讲比较复杂,但会常用,主要有单条确认、批量确认、异步批量确认三种方式。源码分析
2.1 单条确认性能
此种方式比较简单,通常都是一条条的发送,代码以下:学习
try { Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); //set channel publisher confirm mode channel.confirmSelect(); // publish message channel.basicPublish("exchange", "routingkey", null, "publisher confirm test".getBytes()); if (!channel.waitForConfirms()) { // publisher confirm failed handle System.out.println("send message failed!"); } } catch (Exception e) { e.printStackTrace(); }
若是想学习Java工程化、高性能及分布式、深刻浅出。微服务、Spring,MyBatis,Netty源码分析的朋友能够加个人Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给你们。
2.2 批量确认
问:批量确认comfirm须要解决出现返回的Basic.Nack或者超时状况的话,客户须要将这一批次消息所有重发,那么采用什么样的存储结构才能合适地将这些消息动态筛选出来。
最好是须要增长一个缓存,将发送成功而且确认Ack以后的消息去除,剩下Nack或者超时的消息,改进以后的代码以下:
// take ArrayList or BlockingQueue as a cache List<Object> cache = new ArrayList<>(); // set channel publisher confirm mode channel.confirmSelect(); for (int i=0; i < 20; i++) { // publish message String message = "publisher message["+ i +"]"; cache.add(message); channel.basicPublish("exchange", "routingkey", null, message.getBytes()); if (channel.waitForConfirms()) { // remove message publisher confirm cache.remove(i); } // TODO handle Nack message:republish } } catch (Exception e) { e.printStackTrace(); // TODO handle Nack message:republish }
2.3 异步批量确认
异步确认方式经过在客户端addConfirmListener增长ConfirmListener回调接口,包括handleAck与handleNack处理方法:
每次发送消息confirmSet集合元素加1,当消息被确认ack进入handleAck方法时,“unconfirm”集合中删除响应的一条(multiple设置为false时)或者多条记录(multiple设置为true时),其中存储缓存最好采用SortedSet数据结构
代码以下:
try { Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); // take as a cache SortedSet cache = new TreeSet(); // set channel publisher confirm mode channel.confirmSelect(); for (int i = 0; i < 20; i++) { // publish message long nextSeqNo = channel.getNextPublishSeqNo(); String message = "publisher message[" + i + "]"; cache.add(message); channel.basicPublish("exchange", "routingkey", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); cache.add(nextSeqNo); } // add confirmCalback: handleAck, handleNack channel.addConfirmListener(new ConfirmListener() { @Override public void handleAck(long deliveryTag, boolean multiple) { if (multiple) { // batch remove ack message cache.headSet(deliveryTag - 1).clear(); } else { // remove ack message cache.remove(deliveryTag); } } @Override public void handleNack(long deliveryTag, boolean multiple) { // TODO handle Nack message:republish } }); } catch (Exception e) { e.printStackTrace(); // TODO handle Nack message:republish }
三、总结比较
1)是确认机制好呢?仍是事务机制?二者能够共存吗?
确认机制相对于事务机制,最大的好处就是能够异步处理提升吞吐量,不须要额外等待消耗资源。可是二者时候不能同时共存的。
2)那么确认机制的三种方式之间呢?实际产生环境是推荐哪种呢?(其实毫无疑问固然是推荐异步批量确认方式)
批量确认的最大问题就是在于返回的Nack消息须要从新发送,以上普通单条确认、批量确认、批量异步确认三种方法,在实际生产环境中强烈推荐使用批量异步确认方式。
针对的问题(2),咱们能够经过增长队列与消息的持久化来实现。
一、交换器的持久化
交换器的持久化是经过声明队列durable参数为true实现的,若是交换器不设置持久化,那么在RabbitMQ服务器重启以后,相关的交换器元数据会丢失,消息不会丢失,只是不能将消息发送到这个交换器中。所以,都是建议将其置为持久化。
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true, false, null);
二、队列的持久化
队列持久化同理与交换器持久化,只是RabbitMQ服务器重启以后,相关的元数据会丢失,数据也会跟着丢失,消息也天然丢失。
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
三、消息的持久化
队列的持久化不能保证内存存储的消息不会丢失,要确保消息不会丢失,须要将其经过设置BasicProperties中的deliveryMode属性为2可实现消息的持久化(PERSISTENT_TEXT_PLAIN实际上已经封装了这个属性),也就是说只有实现了队列与消息的持久化,才能保证消息不会丢失。
// 其中的2就是投递模式 public static Class final BasicProperties_PERSISTENT_TEXT_PLAIN = new BasicProperties("text/plain", null, null, 2, null, null, null, null, null, null, null, null, null);
四、消息丢失的几种状况
但实际上不是设置了交换器、队列、消息持久化就能必定保证消息不会被丢失,如下几种状况是可能丢失的,好比:
1)设置autoAck为true,消费者收到消息后,还没处理就宕机了,这样也算数据丢失,解决办法是设置为false,以后手动确认。
2)在设置了持久化后消息存入RabbitMQ以后,还须要一段时间才能存入磁盘之中(虽然很短,但不能忽视),RabbitMQ并不会为每条消息都今次那个同步存盘,可能只会保存到操做系统缓存之中而不是物理磁盘中,若是RabbitMQ这个时间段内宕机、异常、重启等状况,消息也会丢失,解决办法是引入RabbitMQ的镜像队列机制(相似于集群,Master挂了切换到Slave)
没有彻底十全十美的方式能保证数据能100%不丢失,而且最大效率节约性能消耗等,两篇博文虽然已经提出经常使用的四种方式,当实际环境中整个RabbitMQ环境在搭建没有结合实际的生产业务环境的话,也会发生消息丢失的等状况,解决这样的问题无非就完善消息备份,健全RabbitMQ集群..........
若是想学习Java工程化、高性能及分布式、深刻浅出。微服务、Spring,MyBatis,Netty源码分析的朋友能够加个人Java高级交流:854630135,群里有阿里大牛直播讲解技术,以及Java大型互联网技术的视频免费分享给你们。