,若是要保证消息的可靠性,须要对消息进行持久化处理,然而消息持久化除了须要代码的设置以外,还有一个重要步骤是相当重要的,那就是保证你的消息顺利进入Broker(代理服务器),如图所示:java
正常状况下,若是消息通过交换器进入队列就能够完成消息的持久化,但若是消息在没有到达broker以前出现意外,那就形成消息丢失,有没有办法能够解决这个问题?服务器
RabbitMQ有两种方式来解决这个问题:异步
事务的实现主要是对信道(Channel)的设置,主要的方法有三个:ide
channel.txSelect()声明启动事务模式;性能
channel.txComment()提交事务;测试
channel.txRollback()回滚事务;spa
从上面的能够看出事务都是以tx开头的,tx应该是transaction extend(事务扩展模块)的缩写,若是有准确的解释欢迎在博客下留言。代理
咱们来看具体的代码实现:code
// 建立链接 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(config.UserName); factory.setPassword(config.Password); factory.setVirtualHost(config.VHost); factory.setHost(config.Host); factory.setPort(config.Port); Connection conn = factory.newConnection(); // 建立信道 Channel channel = conn.createChannel(); // 声明队列 channel.queueDeclare(_queueName, true, false, false, null); String message = String.format("时间 => %s", new Date().getTime()); try { channel.txSelect(); // 声明事务 // 发送消息 channel.basicPublish("", _queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes("UTF-8")); channel.txCommit(); // 提交事务 } catch (Exception e) { channel.txRollback(); } finally { channel.close(); conn.close(); }
注意:用户需把config.xx配置成本身Rabbit的信息。orm
从上面的代码咱们能够看出,在发送消息以前的代码和以前介绍的都是同样的,只是在发送消息以前,须要声明channel为事务模式,提交或者回滚事务便可。
了解了事务的实现以后,那么事务到底是怎么执行的,让咱们来使用wireshark抓个包看看,如图所示:
输入ip.addr==rabbitip && amqp查看客户端和rabbit之间的通信,能够看到交互流程:
以上就完成了事务的交互流程,若是其中任意一个环节出现问题,就会抛出IoException移除,这样用户就能够拦截异常进行事务回滚,或决定要不要重复消息。
那么,既然已经有事务了,没什么还要使用发送方确认模式呢,缘由是由于事务的性能是很是差的。事务性能测试:
事务模式,结果以下:
非事务模式,结果以下:
从上面能够看出,非事务模式的性能是事务模式的性能高149倍,个人电脑测试是这样的结果,不一样的电脑配置略有差别,但结论是同样的,事务模式的性能要差不少,那有没有既能保证消息的可靠性又能兼顾性能的解决方案呢?那就是接下来要讲的Confirm发送方确认模式。
咱们知道,消费者可使用消息自动或手动发送来确认消费消息,那若是咱们在消费者模式中使用事务(固然若是使用了手动确认消息,彻底用不到事务的),会发生什么呢?
消费者模式使用事务
假设消费者模式中使用了事务,而且在消息确认以后进行了事务回滚,那么RabbitMQ会产生什么样的变化?
结果分为两种状况:
Confirm发送方确认模式使用和事务相似,也是经过设置Channel进行发送方确认的。
Confirm的三种实现方式:
方式一:channel.waitForConfirms()普通发送方确认模式;
方式二:channel.waitForConfirmsOrDie()批量确认模式;
方式三:channel.addConfirmListener()异步监听发送方确认模式;
// 建立链接 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(config.UserName); factory.setPassword(config.Password); factory.setVirtualHost(config.VHost); factory.setHost(config.Host); factory.setPort(config.Port); Connection conn = factory.newConnection(); // 建立信道 Channel channel = conn.createChannel(); // 声明队列 channel.queueDeclare(config.QueueName, false, false, false, null); // 开启发送方确认模式 channel.confirmSelect(); String message = String.format("时间 => %s", new Date().getTime()); channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8")); if (channel.waitForConfirms()) { System.out.println("消息发送成功" ); }
看代码能够知道,咱们只须要在推送消息以前,channel.confirmSelect()声明开启发送方确认模式,再使用channel.waitForConfirms()等待消息被服务器确认便可。
// 建立链接 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(config.UserName); factory.setPassword(config.Password); factory.setVirtualHost(config.VHost); factory.setHost(config.Host); factory.setPort(config.Port); Connection conn = factory.newConnection(); // 建立信道 Channel channel = conn.createChannel(); // 声明队列 channel.queueDeclare(config.QueueName, false, false, false, null); // 开启发送方确认模式 channel.confirmSelect(); for (int i = 0; i < 10; i++) { String message = String.format("时间 => %s", new Date().getTime()); channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8")); } channel.waitForConfirmsOrDie(); //直到全部信息都发布,只要有一个未确认就会IOException System.out.println("所有执行完成");
以上代码能够看出来channel.waitForConfirmsOrDie(),使用同步方式等全部的消息发送以后才会执行后面代码,只要有一个消息未被确认就会抛出IOException异常。
// 建立链接 ConnectionFactory factory = new ConnectionFactory(); factory.setUsername(config.UserName); factory.setPassword(config.Password); factory.setVirtualHost(config.VHost); factory.setHost(config.Host); factory.setPort(config.Port); Connection conn = factory.newConnection(); // 建立信道 Channel channel = conn.createChannel(); // 声明队列 channel.queueDeclare(config.QueueName, false, false, false, null); // 开启发送方确认模式 channel.confirmSelect(); for (int i = 0; i < 10; i++) { String message = String.format("时间 => %s", new Date().getTime()); channel.basicPublish("", config.QueueName, null, message.getBytes("UTF-8")); } //异步监听确认和未确认的消息 channel.addConfirmListener(new ConfirmListener() { @Override public void handleNack(long deliveryTag, boolean multiple) throws IOException { System.out.println("未确认消息,标识:" + deliveryTag); } @Override public void handleAck(long deliveryTag, boolean multiple) throws IOException { System.out.println(String.format("已确认消息,标识:%d,多个消息:%b", deliveryTag, multiple)); } });
异步模式的优势,就是执行效率高,不须要等待消息执行完,只须要监听消息便可,以上异步返回的信息以下:
能够看出,代码是异步执行的,消息确认有多是批量确认的,是否批量确认在于返回的multiple的参数,此参数为bool值,若是true表示批量执行了deliveryTag这个值之前的全部消息,若是为false的话表示单条确认。
Confirm性能测试
测试前提:与事务同样,咱们发送1w条消息。
方式一:Confirm普通模式
方式二:Confirm批量模式
方式三:Confirm异步监听方式
综合整体测试状况来看:Confirm批量肯定和Confirm异步模式性能相差不大,Confirm模式要比事务快10倍左右