在前面讲到了RabbitMQ高可用集群的搭建,可是咱们知道只是集群的高可用并不能保证应用在使用消息队列时彻底没有问题,例如若是应用链接的RabbitMQ集群忽然宕机了,虽然这个集群时可使用的,可是应用订阅的链接就断开了,若是有个机房外网出口带宽被挖掘机弄断了,那集群依然是不可用的。因此咱们后面会介绍应用APP如何与链接集群来保证二者配合默契,以及如何实现跨机房的集群复制。服务器
前面讲到应用服务器经过一个负载均衡服务将链接的流量分发到指定服务器,若是链接的节点宕机怎么办呢。应用服务器链接集群主要作两件事,订阅和发布,因此若是是发布消息每次都会从新初始化链接因此链接节点的切换对整个系统的可用性影响不大。若是是订阅消息就没有真么简单了。首先咱们要作到若是链接出现问题应该是抛出异常而不是终止脚本,而且这时应该从新链接链接。
好了不废话了,代码以下:负载均衡
ColonyProductide
using System; using System.Text; using RabbitMQ.Client; using RabbitMQ.Client.Events; namespace ColonyProduct { class Program { static void Main(string[] args) { String exchangeName = "wytExchange"; String queueName = "wytQueue"; ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "192.168.63.130"; factory.Port = 5672; factory.VirtualHost = "/wyt"; factory.UserName = "wyt"; factory.Password = "wyt"; using (IConnection connection=factory.CreateConnection()) { using (IModel channel=connection.CreateModel()) { channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: String.Empty, arguments: null); IBasicProperties properties = channel.CreateBasicProperties(); properties.Persistent = true; for (int i = 0; i < 10; i++) { Byte[] body = Encoding.UTF8.GetBytes("Hello World -- "+i); channel.BasicPublish(exchange: exchangeName, routingKey: String.Empty, basicProperties: properties, body: body); } } } Console.WriteLine("发送完成"); Console.ReadKey(); } } }
ColonyConsumerspa
using RabbitMQ.Client; using RabbitMQ.Client.Events; using System; using System.Text; namespace ColonyConsumer { class Program { static void Main(string[] args) { String exchangeName = "wytExchange"; String queueName = "wytQueue"; ConnectionFactory factory = new ConnectionFactory(); factory.HostName = "192.168.63.133"; factory.Port = 5672; factory.VirtualHost = "/wyt"; factory.UserName = "wyt"; factory.Password = "wyt"; using (IConnection connection=factory.CreateConnection()) { using (IModel channel=connection.CreateModel()) { channel.ExchangeDeclare(exchange: exchangeName, type: ExchangeType.Direct, durable: true, autoDelete: false, arguments: null); channel.QueueDeclare(queue: queueName, durable: true, exclusive: false, autoDelete: false, arguments: null); channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: String.Empty, arguments: null); EventingBasicConsumer consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var body = ea.Body; var message = Encoding.UTF8.GetString(body); var routingKey = ea.RoutingKey; Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message); channel.BasicAck(ea.DeliveryTag, multiple: false); }; channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); Console.WriteLine("等待接收消息"); Console.ReadKey(); } } } } }
因此经过以上的代码就能够保证服务器某节点宕机后订阅的链接自动重连切换。插件
基于warren的共享存储模式code
这种方式其实并非跨地区的远程复制,而且须要共享存储,若是感兴趣的同窗能够百度下。blog
基于Shovel的远程复制rabbitmq
若是直接基于WAN来组建异地的集群的话,集群间大量的数据通信会产生高昂的费用,另外Erlang也不容许这么高延迟的通信。
Shovel是RabbitMQ自带插件(2.7.0后),自带插件的好处就是能够在RabbitMQ服务启动时自动启动Shovel和自定义复制关系。
Shovel运行的原理其实很是简单。经过定义RabbitMQ上一个队列和另一个RabbitMQ上的交换机之间的复制关系来实现远程复制。也就是说它会在主服务上创建一个队列来监听交换机,因此这是到交换机因此的消息会投递到该队列,而且在从服务中订阅这个队列,使队列中的消息复制到从服务的交换机中。RabbitMQ是一个比较全面的消息队列解决方案,咱们公司并无用到该功能,只是在这提下,感兴趣的同窗能够搜下。 队列