rabbitmq可靠性

本文翻译汇总自rabbitmq的官方文档。 java

翻译使用谷歌翻译后简单修改,部份内容读起来仍然比较晦涩,不过意思传达到了。 数据库

可靠性指南

 

本页介绍了如何使用AMQP和RabbitMQ的各类功能来实现可靠的传送 - 确保消息始终被传递,甚至在系统的任何部分遇到故障。 express

 

什么能够失败?

 

网络问题多是最多见的失败类。网络不只可能出现故障,防火墙能够中断空闲链接,而且不会当即检测到网络故障。 服务器

 

除了链接故障以外,broker和客户端应用程序可能会随时遇到硬件故障(或软件崩溃)。此外,即便客户端应用程序持续运行,逻辑错误也可能致使通道或链接错误,迫使客户端创建新的通道或链接,并从问题中恢复。 网络

 

链接失败

 

在链接失败的状况下,客户端将须要与broker创建新的链接。之前链接中打开的任何通道都将自动关闭,这些通道也须要从新打开。 并发

 

通常来讲,当链接失败时,客户端将被链接引起异常(或相似的语言结构)通知。官方Java和.NET客户端还提供了回调方法,让您听到其余上下文中的链接失败 - Java在Connection和Channel类上都提供了ShutdownListener回调,.NET客户端提供了IConnection.ConnectionShutdown和IModel.ModelShutdown事件目的。 异步

 

Acknowledgements和confirm

 

当链接失败时,消息可能在客户端和服务器之间传输 - 它们可能处于被解析或生成的中间,在OS缓冲区或电线上。传输中的消息将丢失 - 它们将须要重传。Acknowledgements让服务器和客户端知道什么时候这样作。 性能

 

Acknowledgements能够在两个方向使用 - 容许消费者向服务器指示它已经接收/处理了消息,并容许服务器向生产者指示相同的东西。 RabbitMQ将后一种状况称为"confirm"。 测试

 

固然,TCP确保已经接收到数据包,而且将从新发送,直到它们 - 但这只是网络层。Acknowledgements和confirm代表已收到消息并采起行动。confirm信号表示接收到消息,而且转让全部权,接收方承担所有责任。 优化

 

Acknowledgements所以具备语义 - 消费的应用程序不该该confirm消息,直到它完成了与它们须要的任何操做 - 将它们记录在数据库中,转发它们,将它们打印到纸张或其余任何东西上。一旦这样作,broker能够自由地忘记该消息。

 

一样,broker一旦承担责任,就会confirm消息(见这里是什么意思)。

 

confirm的使用保证至少一次Delivery。没有confirm,在发布和消费操做期间可能发生消息丢失,而且只有最多的一次Delivery才能获得保证。

 

用心跳检测死TCP链接

 

在某些类型的网络故障中,数据包丢失可能意味着中断的TCP链接须要较长时间(例如,在Linux上使用默认配置约11分钟)才能被操做系统检测到。 AMQP 0-9-1提供心跳功能,以确保应用程序层及时发现链接中断(以及彻底无响应的对等体)。心跳也能够防止可能终止"空闲"TCP链接的某些网络设备。有关详细信息,请参阅心跳。

 

在broker

 

为了不在broker中丢失消息,咱们须要应对broker从新启动,broker硬件故障,甚至是甚至broker崩溃。

 

为了确保从新启动时消息和broker定义生效,咱们须要确保它们在磁盘上。 AMQP标准具备交换,队列和持久消息的耐久性概念,要求持久对象或持久消息将在从新启动后生存。有关持久性和持久性的具体标志的更多详细信息,请参见"AMQP概念指南"。

群集和高可用性

 

若是咱们须要确保咱们的broker幸存硬件故障,咱们可使用RabbitMQ的集群。在RabbitMQ集群中,全部定义(交换,绑定,用户等)都跨整个集群镜像。队列的行为方式不一样,默认状况下只驻留在单个节点上,但能够跨多个或全部节点进行镜像。队列保持可见,而且能够从全部节点访问,不管它们位于何处。

 

镜像队列在全部已配置的集群节点之间复制其内容,能够无缝地容忍节点故障,而且不会丢失消息(尽管请参阅非同步从站上的此注释)。然而,消费应用程序须要注意,当队列失败时,消费者将被取消,他们将须要从新考虑 - 有关详细信息,请参阅文档。

 

在Producer

 

当使用confirms时,从通道恢复的生产者或链接故障应重发任何还没有从broker收到confirm的消息。这里存在消息重复的可能性,由于broker可能发送了一个从未到达生产者的confirm(因为网络故障等)。所以,消费者应用程序将须要以幂等(重复执行的效果一致)方式执行重复数据删除或处理传入的消息。

 

确保消息路由

 

在某些状况下,生产者可能很重要的是确保他们的消息被路由到队列(尽管并不老是 - 在公共子系统生产者只会发布的状况下,若是没有消费者感兴趣,那么消息是正确的丢弃)。

 

为了确保消息被路由到一个已知的队列,生产者只能声明一个目标队列并直接发布给它。若是消息可能以更复杂的方式进行路由,可是生产者仍然须要知道他们是否到达了至少一个队列,则能够在basic.publish上设置mandatory标志,确保basic.return(包含回复码和一些文本解释)将被发送回客户端,若是没有队列被适当地绑定。

 

在消费者

 

在网络故障(或节点崩溃)的状况下,可能消息重复,消费者必须准备好处理它们。若是可能,最简单的方法是确保您的消费者以幂等方式处理消息,而不是明确处理重复数据消除。

 

不能处理的消息

 

若是消费者肯定它不能处理消息,那么它可使用basic.reject(或basic.nack)拒绝它,要求服务器从新启动它(在这种状况下,服务器可能被配置为死信)代替。

 

 

消费者Acknowledgements和Producerconfirm

 

介绍

 

使用消息传递broker(如RabbitMQ)的系统按照定义分布。因为发送的协议方法(消息)不能保证到达对等体或被其成功处理,因此发布者和消费者都须要一种用于传送和处理confirm的机制。 RabbitMQ支持的几种消息协议提供了这样的功能。

 

(消费者)DeliveryAcknowledgements

 

当RabbitMQ向消费者发送消息时,须要知道什么时候成功发送消息。什么样的逻辑优化取决于系统。所以,它主要是应用程序的决定。

 

在咱们继续讨论其余主题以前,重要的是要解释Delivery是如何被识别的(并且confirm代表他们各自的Delivery)。当消费者(订阅)注册时,消息将由RabbitMQ使用basic.deliver方法传递(推送)。该方法携带Deliverytags,其惟一地标识信道上的传递。

 

Deliverytags是单调增加的正整数,并由客户端库呈现。认可Delivery的客户端库方法将Deliverytags做为参数。

 

频道预取设置(QoS)

 

因为消息以异步方式发送(推送)到客户端,所以一般在任何给定时刻一般会有多个消息"在飞行中"。此外,客户端的手动confirm本质上也是异步的。因此有一个未被confirm的Deliverytags的滑动窗口。开发人员一般会倾向于限制此窗口的大小,以免消费者端端的无限缓冲区问题。这是经过使用basic.qos方法设置"预取计数"值来完成的。该值定义了通道上容许的未confirmDelivery的最大数量。一旦数量达到配置的计数,RabbitMQ将中止在通道上传递更多消息,除非至少有一个未confirm的消息被confirm。

 

例如,鉴于在通道Ch上未confirm的Deliverytags5,6,7和8设置为4,RabbitMQ不会再推送任何更多的Delivery,除非至少有一个未完成的Delivery被confirm。当经过delivery_tag设置为8的confirm帧到达该通道时,RabbitMQ将会注意并传递一条消息。

 

值得重申的是,Delivery流程和手动客户端confirm彻底是异步的。所以,若是在飞行中已经有Delivery时改变了预取值,则出现天然竞争条件,而且可能暂时超过在通道上预取计数未confirm的消息。

 

能够为通道或消费者配置QoS设置。有关详细信息,请参阅消费者预取。

 

即便在手动confirm模式下,QoS设置也不会影响使用basic.get("pull API")获取的消息。

 

 

Producer confirm

 

使用标准AMQP 0-9-1,保证消息不丢失的惟一方法是使用事务 - 使信道事务发布,发布消息,提交。在这种状况下,交易是没必要要的重量级,并将吞吐量下降250倍。为了弥补这一点,引入了confirm机制。它模仿了协议中已经存在的消费者confirm机制。

 

要启用confirm,客户端发送confirm.select方法。根据是否设置不等待,broker能够经过confirm.select-ok进行回复。一旦在通道上使用了confirm.select方法,就被认为处于confirm模式。事务通道不能进入confirm模式,一旦通道处于confirm模式,则不能进行事务处理。

 

一旦一个通道处于confirm模式,broker和客户端都会计数消息(从第一个confirm.select开始计数)。而后,broker经过在同一个频道上发送basic.ack来confirm消息。发送tags字段包含已confirm消息的序列号。broker还能够在basic.ack中设置多个字段,以指示全部到达并包含具备序列号的消息的消息已被处理。

 

下面是Java中以confirm模式向通道发布大量消息并等待confirm的示例。

// Copyright (c) 2007-Present Pivotal Software, Inc. All rights reserved.

//

// This software, the RabbitMQ Java client library, is triple-licensed under the

// Mozilla Public License 1.1 ("MPL"), the GNU General Public License version 2

// ("GPL") and the Apache License version 2 ("ASL"). For the MPL, please see

// LICENSE-MPL-RabbitMQ. For the GPL, please see LICENSE-GPL2. For the ASL,

// please see LICENSE-APACHE2.

//

// This software is distributed on an "AS IS" basis, WITHOUT WARRANTY OF ANY KIND,

// either express or implied. See the LICENSE file for specific language governing

// rights and limitations of this software.

//

// If you have any questions regarding licensing, please contact us at

// info@rabbitmq.com.

 

 

package com.rabbitmq.examples;

 

import java.io.IOException;

 

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.MessageProperties;

import com.rabbitmq.client.QueueingConsumer;

 

public class ConfirmDontLoseMessages {

static int msgCount = 10000;

final static String QUEUE_NAME = "confirm-test";

static ConnectionFactory connectionFactory;

 

public static void main(String[] args)

throws IOException, InterruptedException

{

if (args.length > 0) {

msgCount = Integer.parseInt(args[0]);

}

 

connectionFactory = new ConnectionFactory();

 

// Consume msgCount messages.

(new Thread(new Consumer())).start();

// Publish msgCount messages and wait for confirms.

(new Thread(new Publisher())).start();

}

 

@SuppressWarnings("ThrowablePrintedToSystemOut")

static class Publisher implements Runnable {

public void run() {

try {

long startTime = System.currentTimeMillis();

 

// Setup

Connection conn = connectionFactory.newConnection();

Channel ch = conn.createChannel();

ch.queueDeclare(QUEUE_NAME, true, false, false, null);

ch.confirmSelect();

 

// Publish

for (long i = 0; i < msgCount; ++i) {

ch.basicPublish("", QUEUE_NAME,

MessageProperties.PERSISTENT_BASIC,

"nop".getBytes());

}

 

ch.waitForConfirmsOrDie();

 

// Cleanup

ch.queueDelete(QUEUE_NAME);

ch.close();

conn.close();

 

long endTime = System.currentTimeMillis();

System.out.printf("Test took %.3fs\n",

(float)(endTime - startTime)/1000);

} catch (Throwable e) {

System.out.println("foobar :(");

System.out.print(e);

}

}

}

 

static class Consumer implements Runnable {

public void run() {

try {

// Setup

Connection conn = connectionFactory.newConnection();

Channel ch = conn.createChannel();

ch.queueDeclare(QUEUE_NAME, true, false, false, null);

 

// Consume

QueueingConsumer qc = new QueueingConsumer(ch);

ch.basicConsume(QUEUE_NAME, true, qc);

for (int i = 0; i < msgCount; ++i) {

qc.nextDelivery();

}

 

// Cleanup

ch.close();

conn.close();

} catch (Throwable e) {

System.out.println("Whoosh!");

System.out.print(e);

}

}

}

}

否认confirm

 

在特殊状况下,当broker没法成功处理消息时,代替basic.ack,broker将发送一个basic.nack。在这种状况下,basic.nack的字段具备与basic.ack中相应的含义相同的含义,而且请求字段应该被忽略。broker表示没法处理消息,拒绝对其发送一则或多封消息;在这一点上,客户端可能会选择从新发布消息。

 

通道置于confirm模式后,全部后续发布的消息将被confirm或不存在一次。不能保证消息被confirm多久。没有任何消息将被confirm和否认。

 

若是在负责队列的Erlang进程中发生内部错误,则只会传递basic.nack。

 

当消息被从新排队时,若是可能,它将被置于其队列中的原始位置。若是没有(因为多个消费者共享队列时因为其余消费者的并发Delivery和confirm),该消息将被从新排列到更接近队列头的位置。

 

何时confirm message?

 

对于不可路由的消息,一旦交换验证消息将不会路由到任何队列(返回空列表的队列),broker将发出confirm。若是消息也被发布为强制性,则basic.return将在basic.ack以前发送给客户端。否认的confirm也是如此(basic.nack)。

 

对于可路由消息,当全部队列接受消息时,发送basic.ack。对于路由到持久队列的持久消息,这意味着持续到磁盘。对于镜像队列,这意味着全部镜像都已接受该消息。

 

持久化消息的Ack延迟

 

在将消息持续存储到磁盘后,将发送一个持久消息的basic.ack路由到持久化队列。 RabbitMQ消息存储在间隔(几百毫秒)以后分批地将消息存储到磁盘,以最小化fsync(2)调用的数量,或者当队列空闲时。这意味着在一个恒定的负载下,basic.ack的延迟能够达到几百毫秒。为了提升吞吐量,强烈建议应用程序异步处理confirm(做为流)或发布批次的消息,并等待未完成的confirm。客户端库之间的具体API有所不一样。

 

Producerconfirm的订购注意事项

 

在大多数状况下,RabbitMQ将按照发布的相同顺序向Producerconfirm消息(这适用于在单个频道上发布的消息)。然而,发布者的confirm是异步发出的,能够confirm一个消息或一组消息。发出confirm的确切时刻取决于消息的传递模式(持久与瞬态)以及消息被路由到的队列的属性(见上文)。也就是说,不一样的消息能够被认为是准备好在不一样的时间进行confirm。这意味着与其各自的消息相比,confirm能够以不一样的顺序到达。应用程序不该该依赖于confirm的顺序。

 

Producer confirm和保证Delivery

 

若是在全部消息写入磁盘以前崩溃,broker将丢失持久的消息。在某些状况下,这将致使broker以惊人的方式表现。

 

例如,考虑这种状况:

 

客户端向持久队列发布持久消息

客户端从队列中消耗消息(指出消息是持久的,队列持久的),可是尚未肯定,

broker死亡并从新启动,

客户端从新链接并开始消费消息。

在这一点上,客户端能够合理地假设该消息将被再次发送。不是这样:从新启动致使broker丢失该消息。为了保证持久性,客户应该使用confirm。若是Producer的频道处于confirm模式,Producer将不会收到丢失的消息的confirm(由于该消息还没有写入磁盘)。

限制

 

最大Deliverytags

 

Deliverytags是一个64位长的值,所以其最大值为9223372036854775807.因为每一个渠道的Deliverytags是范围限定的,因此Producer或消费者在实践中不太可能超过此值。

 

 

我的理解:

上面的描述很是复杂,我总结来讲,有一下几种状况须要在开发中注意:

  • Producter发送消息以后,没有收到Broker的confirm:

消息可能终止在了传送的层面,如操做系统缓冲层,或者网络传输层,或者是在Broker接受以后,因为内部故障不能处理,如exchang故障,也不会发送confirm给Producter。因此,在咱们的系统中,咱们在producter端实际上是有数据库表存储须要发送的消息的,咱们一次批量发送100条消息,一旦收到confirm,就会删除这部分消息,因此没有接收到confirm的话,就不删除相应的数据。

还要保证消息的幂等。如此就能够保证在producter层面不会丢失消息。

  • broker接收到消息以后,在exchang或者queue中丢失:

设置消息和exchang和queue都为持久的。

  • 找不到消息对应的queue

咱们的程序不会出现这种状况。

  • queue没有对应的consumer

这种状况下,消息会在queue中挤压,也不会丢失。

  • 消息可能会重复发送,因此须要保证消息处理的幂等性。

 

 

broker将在下面的状况中对消息进行confirm:

  • broker发现当前消息没法被路由到指定的queues中(若是设置了mandatory属性,则broker会发送basic.return) 
  • 非持久属性的消息到达了其所应该到达的全部queue中(和镜像queue中)
  • 持久消息到达了其所应该到达的全部queue中(和镜像中),并被持久化到了磁盘(fsync) 
  • 持久消息从其所在的全部queue中被consume了(若是必要则会被ack)

 

批量发送消息,并批量接收确认的例子:

// 发送持久化消息,消息内容为helloWorld for (long i = 0; i < msgCount; ++i)

{

ch.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_BASIC, "helloWorld".getBytes());

}

// 等待全部消息都被ack或者nack,若是某个消息被nack,则抛出IOException

ch.waitForConfirmsOrDie();

网上有人作的测试,使用这种批量确认的模式,和使用异步的方式,性能差的不是太多。可是若是使用单条确认,性能将差异数倍。

相关文章
相关标签/搜索