RabbitMQ如何保证发送端消息的可靠投递-发生镜像队列发生故障转移时

上一篇最后提到了mandatory这个参数,对于设置mandatory参数我的感受仍是很重要的,尤为在RabbitMQ镜像队列发生故障转移时。html

模拟个测试环境以下:app

首先在集群队列中增长两个镜像队列的策略:测试

对于ha-promote-on-shutdown这个参数,能够参考文档,其做用就是当集群中master出现故障时强制进行故障转移从而选出新的master节点,这里的master出现故障表示的是人为的故障好比经过命令行rabbitmqctl.bat start_app之类的关闭RabbitMQ实例或者说是关闭电脑之类的。由于这种强制切换master节点的状况一般发生在断电之类的非可控因素上,因此经过设置这个参数为always模拟非可控因素。spa

固然设置这个参数会存在必定风险,文档里也说了,会发生消息不一样步也就是会丢消息。命令行

而后建立四个队列和两个Echange,采用绑定Exchange的topic模式3d

而后先贴一下测试代码在进行说明调试

C#代码code

       List<string> hosts = new List<string>();
            hosts.Add("192.168.1.1");
            hosts.Add("192.168.1.2"); 
            int curHostIndex = 0;
            string exchange = "always.exchange";
            string touteKey = "yu.1";
            byte[] msg = Encoding.UTF8.GetBytes("hello");
            ConnectAgain:
            ConnectionFactory factory = new ConnectionFactory();
            factory.UserName = "admin";
            factory.Password = "admin";
            factory.VirtualHost = "/";
            factory.HostName = hosts[curHostIndex];
            IConnection conn = factory.CreateConnection(); 
            IModel channel = conn.CreateModel();
            IBasicProperties props = channel.CreateBasicProperties();
            props.ContentType = "text/plain";
            props.DeliveryMode = 2;
            for (int i = 0; i < 5000000; i++)
            {
                try
                {
                    channel.ConfirmSelect();
                    channel.BasicAcks += (sender, eventArgs) => { };
                    channel.BasicReturn += (sender, eventArgs) => Console.WriteLine("消息投递失败 " + eventArgs.ReplyText); 
                    channel.BasicPublish(exchange, touteKey, true, props, msg);
                    bool success = channel.WaitForConfirms(new TimeSpan(0, 0, 0, 0, 1));
                    if (!success)
                        Console.WriteLine("表示消息投递失败 ");
                }
                catch (Exception ex)
                {   //发生连接异常时换个IP进行链接
                    channel.Close();
                    conn.Close();
                    if (curHostIndex == 0)
                        curHostIndex = 1;
                    else
                        curHostIndex = 0;
                    goto ConnectAgain;
                }
            }

Java代码:orm

 public static void publish() throws Exception {
        List<String> hosts = new ArrayList<String>();
        hosts.add("192.168.1.1");
        hosts.add("192.168.1.2");
        int curHostIndex = 1;
        String exchange = "common.exchange";
        String routeKey = "yu.1";
        byte[] msg = "hello".getBytes("UTF-8");
        ConnectAgain:
        while (true) {
            ConnectionFactory factory = new ConnectionFactory();
            factory.setHost(hosts.get(curHostIndex));
            factory.setPort(5672);
            factory.setUsername("admin");
            factory.setPassword("admin");
            factory.setVirtualHost("/");
            // 建立一个新的链接
            Connection connection = factory.newConnection();
            // 建立一个频道
            Channel channel = connection.createChannel();
            channel.addConfirmListener(new ConfirmListener() {
                public void handleAck(long l, boolean b) throws IOException {
                    System.out.println(l);
                }

                public void handleNack(long l, boolean b) throws IOException {
                    System.out.println(l);
                }
            });
            channel.addReturnListener(new ReturnListener() {
                public void handleReturn(int i, String s, String s1, String s2, AMQP.BasicProperties basicProperties, byte[] bytes) throws IOException {
                    System.out.println("响应状态码-ReplyCode:" + i);
                    System.out.println("响应内容-ReplyText:" + s);
                    System.out.println("Exchange:" + s1);
                    System.out.println("RouteKey" + s2);
                    System.out.println("投递失败的消息:" + new String(bytes, "UTF-8"));
                }
            });
            for (int i = 0; i < 5000000; i++) {
                try {
                    channel.confirmSelect();
                    channel.basicPublish(exchange, routeKey, true, MessageProperties.PERSISTENT_TEXT_PLAIN, msg);
                    boolean sucess = channel.waitForConfirms(10);
                    System.out.println(sucess);
                } catch (Exception ex) {
                    try {
                        connection.abort();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    if (curHostIndex == 0)
                        curHostIndex = 1;
                    else
                        curHostIndex = 0;
                    continue ConnectAgain;
                }
            }
        }

    }

 

先测试下always.exchange也就是非人为因素致使的故障转移的状况,先开启客户端让客户端跑着,而后经过命令行中止master节点(也就是Node为WinServer12-1),中止时消息队列的消息状态为htm

图片消息的总数虽然不许确(页面存在延迟) ,但截取的是master中止时刻的消息状态,也够用了,这时候发现slaver节点会切换为master节点(也就是Node为DESKTOP-078UA43),并继续接受消息,客户点也没有发生异常通知(由于订阅了BasicReturn事件而且开启了madartory,若是消息投递失败,咱们能够获得通知,待会也会测试到)。

而后咱们让集群多跑会而后在消息有明显变化的时候在开启老的挂掉的当前为slaver的节点,当前队列消息的状态以下,master为DESKTOP-078UA43

在开启slaver后咱们在当即中止当前的master节点(也就是Node为WinServer12-1),这时候发现集群的master又切回到了DESKTOP-078UA43同时队列中的消息也跟着清除了。。也就是说在DESKTOP-078UA43以前挂掉到重启启动期间WinServer12-1接收到的消息所有丢掉了。。。由此咱们可知,RabbitMQ镜像集群发生非可控因素形成的master故障为了保证可用性,会丢消息。

而对于客户端而已,消息都是可靠投递的,因此监听事件并不会触发。

固然也能够经过设置ha-sync-mode参数进行调整,默认状况下,新加入的节点不会同步已存在节点内的消息,设置为automatic后会进行同步。不过若是没同步完master挂掉的话消息仍是会丢掉的

而后测试下common.exchange会发生的状况,测试这个的时候就是体现mandatory做用的时刻了!

仍是先在集群正常的状况下选取个时间点关掉主节点,当前master为DESKTOP-078UA43

 

而后WinServer12-1变为新的master,此刻发现正常接收消息,并且对客户端而言,消息也是正常投递的。而后打开被关闭的DESKTOP-078UA43节点,它会以slaver身份回归集群,开启前观察下当前队列状态

而后开始操做!发现队列状态以下,NaN,难道说队列中止接受数据了么!!!(若是中止接受数据,客户端同步调用发送时会发送失败么?)并且没法将master进行切换了。

这时候若是在启动WinServer12-1会发现,消息仍是WinServer12-1关闭时刻的消息,WinServer12-1关闭期间DESKTOP-078UA43尽管在接受消息,但实际消息并无被RabbitMQ可靠存储(比较master都没有了。。);

观察下调试的代码,发现消息仍是在正常向RabbitMQ投递。

 

客户端为了保证向RabbitMQ投递消息的可靠,及开启了Conform模式,但此刻同步返回的RabbitMQ处理结果是消息处理完成。那岂不是NaN期间RabbitMQ把消息都吞了?而客户端还傻傻的觉得发送成功了。。

这时候就体现开启mandatory同时订阅 channel.BasicReturn += (sender, eventArgs) => Console.WriteLine("消息投递失败 " + eventArgs.ReplyText);事件的做用了。。由于这时候RabbitMQ会反馈给你消息实际上并无投递成功的信息。

 

这里包含了持久化失败的缘由,同时包含发送消息的详细信息,方便客户端对消息进行在处理。

其实说了这么多,最后想说的是对于消息的一致性,最好仍是不要所有依赖于RabbitMQ,实现最终一致性并保证幂等性才是相对可靠的方案。

相关文章
相关标签/搜索