RabbitMQ使用教程(一)RabbitMQ环境安装配置及Hello World示例html
在上一篇博客中,咱们讲解了如何经过RabbitMQ的生产者确认机制,保证消息尽量的成功的发送到RabbitMQ服务器,这只是从源头下降了消息丢失的概率,并无真正解决以前提到的问题:如何保证RabbitMQ异常状况(人为重启、异常宕机等)下,队列和消息不丢失?github
要解决该问题,就要用到RabbitMQ中持久化的概念,所谓持久化,就是RabbitMQ会将内存中的数据(Exchange 交换器,Queue 队列,Message 消息)固化到磁盘,以防异常状况发生时,数据丢失。spring
其中,RabblitMQ的持久化分为三个部分:springboot
在上篇博客中,咱们声明Exchange的代码是这样的:服务器
private final static String EXCHANGE_NAME = "normal-confirm-exchange"; // 建立一个Exchange channel.exchangeDeclare(EXCHANGE_NAME, "direct");
这种状况下声明的Exchange是非持久化的,在RabbitMQ出现异常状况(重启,宕机)时,该Exchange会丢失,会影响后续的消息写入该Exchange,那么如何设置Exchange为持久化的呢?答案是设置durable参数。app
durable:设置是否持久化。durable设置为true表示持久化,反之是非持久化。性能
持久化能够将交换器存盘,在服务器重启的时候不会丢失相关信息。ui
设置Exchange持久化:
channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
此时调用的重载方法为:
public DeclareOk exchangeDeclare(String exchange, String type, boolean durable) throws IOException { return this.exchangeDeclare(exchange, (String)type, durable, false, (Map)null); }
为了能更好的理解,咱们新建个生产类以下:
package com.zwwhnly.springbootaction.rabbitmq.durable; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class DurableProducer { private final static String EXCHANGE_NAME = "durable-exchange"; private final static String QUEUE_NAME = "durable-queue"; public static void main(String[] args) throws IOException, TimeoutException { // 建立链接 ConnectionFactory factory = new ConnectionFactory(); // 设置 RabbitMQ 的主机名 factory.setHost("localhost"); // 建立一个链接 Connection connection = factory.newConnection(); // 建立一个通道 Channel channel = connection.createChannel(); // 建立一个Exchange channel.exchangeDeclare(EXCHANGE_NAME, "direct"); channel.queueDeclare(QUEUE_NAME, false, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 发送消息 String message = "durable exchange test"; channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); // 关闭频道和链接 channel.close(); connection.close(); } }
示例代码中,咱们新建了1个非持久化的Exchange,1个非持久化的Queue,并将它们作了绑定,此时运行代码,Exchange和Queue新建成功,消息‘durable exchange test’也被正确地投递到了队列中:
此时重启下RabbitMQ服务,会发现Exchange丢失了:
修改下代码,将durable参数设置为ture:
// 建立一个Exchange channel.exchangeDeclare(EXCHANGE_NAME, "direct", true);
此时运行完代码,而后重启下RabbitMQ服务,会发现Exchange再也不丢失:
细心的网友可能会发现,虽然如今重启RabbitMQ服务后,Exchange不丢失了,可是队列和消息丢失了,那么如何解决队列不丢失呢?答案也是设置durable参数。
durable:设置是否持久化。为true则设置队列为持久化。
持久化的队列会存盘,在服务器重启的时候能够保证不丢失相关信息。
简单修改下上面声明Queue的代码,将durable参数设置为true:
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
此时调用的重载方法以下:
public com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments) throws IOException { validateQueueNameLength(queue); return (com.rabbitmq.client.impl.AMQImpl.Queue.DeclareOk)this.exnWrappingRpc((new com.rabbitmq.client.AMQP.Queue.Declare.Builder()).queue(queue).durable(durable).exclusive(exclusive).autoDelete(autoDelete).arguments(arguments).build()).getMethod(); }
运行代码,而后重启RabbitMQ服务,会发现队列如今不丢失了:
虽然如今RabbitMQ重启后,Exchange和Queue都不丢失了,可是存储在Queue里的消息却仍然会丢失,那么如何保证消息不丢失呢?答案是设置消息的投递模式为2,即表明持久化。
修改发送消息的代码为:
// 发送消息 String message = "durable exchange test"; AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(2).build(); channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());
调用的重载方法为:
public void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException { this.basicPublish(exchange, routingKey, false, props, body); }
运行代码,而后重启RabbitMQ服务,发现此时Exchange,Queue,消息都不丢失了:
至此,咱们完美的解决了RabbitMQ重启后,消息丢失的问题。
最终的代码以下,你也能够经过文末的源码连接下载本文用到的全部源码:
package com.zwwhnly.springbootaction.rabbitmq.durable; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.io.IOException; import java.util.concurrent.TimeoutException; public class DurableProducer { private final static String EXCHANGE_NAME = "durable-exchange"; private final static String QUEUE_NAME = "durable-queue"; public static void main(String[] args) throws IOException, TimeoutException { // 建立链接 ConnectionFactory factory = new ConnectionFactory(); // 设置 RabbitMQ 的主机名 factory.setHost("localhost"); // 建立一个链接 Connection connection = factory.newConnection(); // 建立一个通道 Channel channel = connection.createChannel(); // 建立一个Exchange channel.exchangeDeclare(EXCHANGE_NAME, "direct", true); channel.queueDeclare(QUEUE_NAME, true, false, false, null); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); // 发送消息 String message = "durable exchange test"; AMQP.BasicProperties props = new AMQP.BasicProperties().builder().deliveryMode(2).build(); channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes()); // 关闭频道和链接 channel.close(); connection.close(); } }
1)理论上能够将全部的消息都设置为持久化,可是这样会严重影响RabbitMQ的性能。由于写入磁盘的速度比写入内存的速度慢得不止一点点。对于可靠性不是那么高的消息能够不采用持久化处理以提升总体的吞吐量。在选择是否要将消息持久化时,须要在可靠性和吞吐量之间作一个权衡。
2)将交换器、队列、消息都设置了持久化以后仍然不能百分之百保证数据不丢失,由于当持久化的消息正确存入RabbitMQ以后,还须要一段时间(虽然很短,可是不可忽视)才能存入磁盘之中。若是在这段时间内RabbitMQ服务节点发生了宕机、重启等异常状况,消息还没来得及落盘,那么这些消息将会丢失。
3)单单只设置队列持久化,重启以后消息会丢失;单单只设置消息的持久化,重启以后队列消失,继而消息也丢失。单单设置消息持久化而不设置队列的持久化显得毫无心义。
源码地址:https://github.com/zwwhnly/springboot-action.git,欢迎下载。
《RabbitMQ实战指南》