在正常的服务器运行过程当中,时常会面临服务器宕机重启的状况,那么咱们的消息此时会如何呢?很不幸的事情就是,咱们的消息可能会消失,这确定不是咱们但愿见到的结果。因此咱们但愿AMQP服务器崩溃了也能够将消息恢复,这称之为消息持久化。RabbitMQ天然存在这种策略能够帮助咱们完成这件事情。html
当RabbitMQ服务器重启后,原先的队列和交换器会随同里面的消息一同消失。缘由在于每一个队列和交换器都有durable属性,该属性默认是false,它决定了RabbitMQ是否须要在崩溃或者重启以后从新建立队列或者交换器。将它设置为true就表明了持久性,在服务器重启以后就会从新持久的建立队列和交换器。安全
固然作到这点还不够,咱们须要的是持久化的消息,因此在消息发布前,经过将消息的“投递模式”(delivery mode)属性设置为2将消息标记为持久化。到目前为止,消息还只是被表示为持久化,还须要被发布到持久化的交换器中并到达持久化的队列中才行。若是不是这样,包含持久化消息的队列或者交换器挥着Rabbit崩溃重启后不复存在,致使消息成为一个孤儿。所以,总结起来须要作到如下三点:服务器
(1)将消息的投递模式选项设置为2(持久);异步
(2)将消息发送到持久化的交换器;性能
(3)消息到达持久化的队列。spa
注意,若是原先有非持久的交换器或者队列,须要删除后才可从新建立,不然就建立其余名称的交换器或者队列,代码以下:code
//声明持久交换器 channel.ExchangeDeclare( "HelloExchange", //交换器名称 ExchangeType.Direct,//交换器类型 true, //是否持久话 false, //是否自动删除 null //关于交换器的详细设置,键值对形式 ); //声明持久队列 channel.QueueDeclare( "HelloQueue",//队列名称 true, //是否持久化 false, //是否只对首次声明的队列可见 false, //是否自动删除 null ////关于队列和队列内消息的详细设置,键值对形式 ); //发布持久消息 string msg_str = "这是生产者第一次发布的消息"; IBasicProperties msg_pro = channel.CreateBasicProperties(); msg_pro.ContentType = "text/plain";//发布的数据类型 msg_pro.DeliveryMode = 2;//标记持久化
目前为止,咱们已经将消息、队列和交换器设置为持久化。可是事实上还存在着'最后一英里'的距离,就是在把消息写入磁盘前,消息因为服务器宕机而消失该如何?这时候就须要使用到事务,说到事务就会想到SQL中的事务,可是不能搞混了。AMQP中,在把信道设置为事务模式后,经过信道发送消息后还有多个其余的AMQP命令,这些命令是执行仍是忽略,取决于消息的发送是否成功,消息发送成功信道会在事务中完成其余AMQP命令,就能够提交事务了,发送失败则其余AMQP命令将不会执行,咱们也会知道发送失败,而采起相应的措施。事务保证了解决这最后的问题。orm
代码以下:htm
using (IConnection conn = conn_factory.CreateConnection()) { //2.建立信道 using (IModel channel = conn.CreateModel()) { try { channel.TxSelect();//声明事务 //3.发布消息 string msg_str = "这是生产者发布的消息"; IBasicProperties msg_pro = channel.CreateBasicProperties(); msg_pro.ContentType = "text/plain";//发布的数据类型 msg_pro.DeliveryMode = 2; channel.BasicPublish( "HelloExchange", //消息发送目标交换器名称 "hola", //路由键 msg_pro, //消息的发布属性 Encoding.UTF8.GetBytes(msg_str) //消息 ); channel.TxCommit();//提交事务 } catch(Exception ex) { channel.TxRollback();//回滚事务 } } }
虽然经过事务和持久化的消息、队列和交换器能够确保消息不会丢失,可是对消息的吞吐量有着很是严重的影响,并且使用消息通讯就是为了不同步,但是事务却会致使生产者程序产生同步。因此,有一个更好的方法保证消息投递:发送方确认模式。和事务相似,咱们须要将信道channel设置为confirm模式,并且只能经过从新建立信道来关闭该设置。一旦信道进入confirm模式,全部的信道上发布的消息都会被指派一个惟一的ID。当消息被投递到队列后,信道就会发送一个发送方确认模式给生产者程序,使得生产者知道消息安全到达队列了。blog
发送发确认模式最大的好处是它们是异步的,没有回滚的概念,更加轻量级,对性能的影响也几乎忽略不计。
代码以下:
channel.ConfirmSelect();//开启发送确认模式 //3.发布消息 IBasicProperties msg_pro = channel.CreateBasicProperties(); msg_pro.ContentType = "text/plain";//发布的数据类型 msg_pro.DeliveryMode = 2; for(int i = 0; i < 5; i++) { string msg_str = string.Format("这是生产者发布的消息{0}", i); channel.BasicPublish( "HelloExchange", //消息发送目标交换器名称 "hola", //路由键 msg_pro, //消息的发布属性 Encoding.UTF8.GetBytes(msg_str) //消息 ); if (channel.WaitForConfirms()) Console.WriteLine(i); else Console.WriteLine("发送失败"); }
能够看到channel.WaitForConfirms()方法是同步的,这样的话效率会低一点,咱们能够发送完全部的消息,而后用channel.WaitForConfirmsOrDie()一次性提交,若是中途有一个消息提交失败或者超时,就会报错Exception,须要所有从新提交。
消息持久化的策略大体就是以上几种,咱们能够根据本身的实际需求来选择相应的策略。若是有问题欢迎指出!
原文出处:https://www.cnblogs.com/xwc1996/p/10041116.html