RabbitMQ简介与使用

RabbitMQ简介与使用

 2013年3月23日  小白  学习笔记php

1. AMQP简介

在了解RabbitMQ以前,首先要了解AMQP协议。AMQP,即Advanced Message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。html

当前各类应用大量使用异步消息模型,并随之产生众多消息中间件产品及协议,标准的不一导致应用与中间件之间的耦合限制产品的选择,并增长维护成本。AMQP是一个提供统一消息服务的应用层标准协议,基于此协议的客户端与消息中间件可传递消息,并不受不一样客户端/中间件产品,不一样开发语言等条件的限制。java

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。node

AMQP的实现有:mysql

  • OpenAMQweb

  • AMQP的开源实现,用C语言编写,运行于Linux、AIX、Solaris、Windows、OpenVMSsql

  • Apache Qpidshell

  • Apache的开源项目,支持C++、Ruby、Java、JMS、Python和.NET缓存

  • Redhat Enterprise MRG安全

  • 实现了AMQP的最新版本0-10,提供了丰富的特征集,好比彻底管理、联合、Active-Active集群,有Web控制台,还有许多企业级特征,客户端支持C++、Ruby、Java、JMS、Python和.NET

  • RabbitMQ

  • 一个独立的开源实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。RabbitMQ发布在Ubuntu、FreeBSD平台

  • AMQP Infrastructure

  • Linux下,包括Broker、管理工具、Agent和客户端

  • ØMQ

  • 一个高性能的消息平台,在分布式消息网络可做为兼容AMQP的Broker节点,绑定了多种语言,包括Python、C、C++、Lisp、Ruby等

  • Zyre

  • 是一个Broker,实现了RestMS协议和AMQP协议,提供了RESTful HTTP访问网络AMQP的能力

以上是AMQP中的核心概念:

  • Broker

  • 消息服务器的实体

  • 虚拟主机(Virtual Host)

  • 一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。客户端应用程序在登陆到服务器以后,能够选择一个虚拟主机。每一个链接(包括全部channel)都必须关联至一个虚拟主机

  • 交换器(Exchange)

  • 服务器中的实体,用来接收生产者发送的消息并将这些消息路由给服务器中的队列

  • 消息队列(Message Queue)

  • 服务器中的实体,用来保存消息直到发送给消费者

  • 生产者(Producer)

  • 一个向交换器发布消息的客户端应用程序

  • 消费者(Consumer)

  • 一个从消息队列中请求消息的客户端应用程序

  • 绑定器(Binding)

  • 将交换器和队列链接起来,而且封装消息的路由信息

全部这些组件的属性各不相同,可是只有交换器和队列被命名。客户端能够经过交换器的名字来发送消息,也能够经过队列的名字收取信息。由于AMQ 协议没有一个通用的标准方法来得到全部组件的名称,因此客户端对队列和交换器的访问被限制在仅能使用熟知的或者只有本身知道的名字。

绑定器没有名字,它们的生命期依赖于所紧密链接的交换器和队列。若是这二者任意一个被删除掉,那么绑定器便失效了。这就说明,若要知道交换器和队列的名字,还须要设置消息路由。

消息是一个不透明的数据包,这些包有以下性质:

  •  元数据,例如内容的编码或者代表来源的字段

  •  标志位,标记消息投递时候的一些保障机制

  •  一个特殊的字段叫作routing key

发送消息是一个很是简单的过程。客户端声明一个它想要发送消息的目的交换器,而后将消息传递给交换器。

接受消息的最简单办法是设置一个订阅。客户端须要声明一个队列,而且使用一个绑定器将以前的交换器和队列绑定起来,这样的话,订阅就设置完毕。

交换器的类型:

  • fanout交换器

  • 不会解释任何东西:它只是将消息投递到全部绑定到它的队列中

  • direct交换器

  • 将消息根据其routing-key属性投递到包含对应key属性的绑定器上

  • topic交换器

  • 模式匹配分析消息的routing-key属性。它将routing-key和binding-key的字符串切分红单词。这些单词之间用点隔开。它一样也会识别两个通配符:#匹配0个或者多个单词,*匹配一个单词。例如,binding key *.stock.#匹配routing-key usd.stcok和eur.stock.db,可是不匹配stock.nasdaq

  • header交换器

  • 根据应用程序消息的特定属性进行匹配

  • failover和system交换器

  • 当前RabbitMQ版本中均未实现

没有绑定器,哪怕是最简单的消息,交换器也不能将其投递到队列中,只能抛弃它。经过订阅一个队列,消费者可以从队列中获取消息,而后在使用事后将其从队列中删除。

不一样于队列的是,交换器有相应的类型,代表它们的投递方式(一般是在和绑定器协做的时候)。由于交换器是命名实体,因此声明一个已经存在的交换器, 可是试图赋予不一样类型是会致使错误。客户端须要删除这个已经存在的交换器,而后从新声明而且赋予新的类型。

交换器也有一些性质:

  •  持久性:若是启用,交换器将会在Broker重启前都有效

  •  自动删除:若是启用,那么交换器将会在其绑定的队列都被删除掉以后自动删除掉自身

  •  惰性:若是没有声明交换器,那么在执行到使用的时候会致使异常,并不会主动声明

AMQP Broker都会对其支持的每种交换器类型(为每个虚拟主机)声明一个实例。这些交换器的命名规则是amq.前缀加上类型名。例如 amq.fanout。空的交换器名称等于amq.direct。对这个默认的direct交换器(也仅仅是对这个交换器),Broker将会声明一个绑定了系统中全部队列的绑定器。

这个特色告诉咱们,在系统中,任意队列均可以和默认的direct交换器绑定在一块儿,只要其routing-key等于队列名字。

默认绑定器的行为揭示了多绑定器的存在,将一个或者多个队列和一个或者多个交换器绑定起来。这使得能够将发送到不一样交换器的具备不一样routing key(或者其余属性)的消息发送到同一个队列中。

队列也有如下属性,这些属性和交换器所具备的属性相似。

  •  持久性:若是启用,队列将会在Broker重启前都有效

  •  自动删除:若是启用,那么队列将会在全部的消费者中止使用以后自动删除掉自身

  •  惰性:若是没有声明队列,那么在执行到使用的时候会致使异常,并不会主动声明

  •  排他性:若是启用,队列只能被声明它的消费者使用

这些性质能够用来建立例如排他和自删除的transient或者私有队列。这种队列将会在全部连接到它的客户端断开链接以后被自动删除掉 – 它们只是短暂地链接到Broker,可是能够用于实现例如RPC或者在AMQ上的对等通讯。

AMQP上的RPC是这样的:RPC客户端声明一个回复队列,惟一命名(例如用UUID19), 而且是自删除和排他的。而后它发送请求给一些交换器,在消息的reply-to字段中包含了以前声明的回复队列的名字。RPC服务器将会回答这些请求,使用消息的reply-to做为routing key(以前提到过默认绑定器会绑定全部的队列到默认交换器)发送到默认交换器。注意仅仅是惯例而已。根据和RPC服务器的约定,它能够解释消息的任何属性(甚至数据体)来决定回复给谁。

队列也能够是持久的,可共享,非自动删除以及非排他的。使用同一个队列的多个用户接收到的并非发送到这个队列的消息的一份拷贝,而是这些用户共享这队列中的一份数据,而后在使用完以后删除掉。

2. RabbitMQ简介

RabbitMQ是一个遵循AMQP协议的消息中间件,它从生产者接收消息并递送给消费者,在这个过程当中,根据规则进行路由,缓存与持久化。

几个概念说明(彻底遵循AMQP中的概念):

  •  Broker:简单来讲就是消息队列服务器实体

  •  Exchange:消息交换机,它指定消息按什么规则,路由到哪一个队列

  •  Queue:消息队列载体,每一个消息都会被投入到一个或多个队列

  •  Binding:绑定,它的做用就是把exchange和queue按照路由规则绑定起来

  •  Routing Key:路由关键字,exchange根据这个关键字进行消息投递

  •  vhost:虚拟主机,一个broker里能够开设多个vhost,用做不一样用户的权限分离

  •  producer:消息生产者,就是投递消息的程序

  •  consumer:消息消费者,就是接受消息的程序

  •  channel:消息通道,在客户端的每一个链接里,可创建多个channel,每一个channel表明一个会话任务

消息队列的使用过程大概以下:

  1. 客户端链接到消息队列服务器,打开一个channel

  2. 客户端声明一个exchange,并设置相关属性

  3. 客户端声明一个queue,并设置相关属性

  4. 客户端使用routing key,在exchange和queue之间创建好绑定关系

  5. 客户端投递消息到exchange

exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。

exchange也有几个类型,彻底根据key进行投递的叫作Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。对key进行模式匹配后进行投递的叫作Topic交换机,符号”#”匹配一个或多个词,符号”*”匹配正好一个词。例如”abc.#”匹配”abc.def.ghi”,”abc.*”只匹配”abc.def”。还有一种不须要key的,叫作Fanout交换机,它采起广播模式,一个消息进来时,投递到与该交换机绑定的全部队列。

RabbitMQ支持消息的持久化,消息队列持久化包括3个部分:

  •  exchange持久化,在声明时指定durable为true

  •  queue持久化,在声明时指定durable为true

  •  消息持久化,在投递时指定delivery_mode 为2(1是非持久化)

若是exchange和queue都是持久化的,那么它们之间的binding也是持久化的。若是exchange和queue二者之间有一个持久化,一个非持久化,就不容许创建绑定。

RabbitMQ的特性:

  •  可靠性:包括消息持久化,消费者和生产者的消息确认

  •  灵活路由:遵循AMQP协议,支持多种Exchange类型实现不一样路由策略

  •  分布式:集群的支持,包括本地网络与远程网络

  •  高可用性:支持主从备份与镜像队列

  •  多语言支持:支持多语言的客户端

  •  WEB界面管理:能够管理用户权限,exhange,queue,binding,与实时监控

  •  访问控制:基于vhosts实现访问控制

  •  调试追踪:支持tracing,方便调试

3. RabbitMQ使用向导

3.1. RabbitMQ的安装

3.1.1. 准备工做与安装

由于RabbitMQ由ERLANG实现,安装RabbitMQ以前要先安装ERLANG

安装包:otp_src_R15B03-1.tar.gz ERLANG安装包

                 rabbitmq-server-generic-unix-3.0.0.tar.gz RabbitMQ服务端

                 rabbitmq-java-client-bin-3.0.0.tar.gz  RabbitMQ客户端,包含性能测试脚本

如下是上述版本为例的安装步骤,后续章节描述的内容都对应此版本ERLANG的安装步骤:

tar -zxf otp_src_R15B03-1.tar.gzz
cd otp_src_R15B03
./configure
make
make install

RabbitMQ客户端与服务端的安装直接解压安装包便可,客户端的目录中,rabbitmq-client.jar为JAVA版的客户端,编写客户端程序时须要引用,脚本文件为性能测试脚本

$RABBIT_MQ_HOME/sbin目录中的文件说明及命令使用参考http://www.rabbitmq.com/manpages.html

RabbitMQ的启停:

rabbitmq-server启动服务,如要之后台方式启动,增长-detached参数

rabbitmqctl stop中止服务

rabbitmq-plugins enable rabbitmq_management打开WEB管理界面插件,默认访问地址:

http://服务器IP:15672

3.1.2. 单台RabbitMQ的配置

经过配置环境变量或者配置文件,修改诸如端口,绑定IP,broker的名称等,参考配置管理章节

例如:

修改$RABBIT_MQ_HOME/sbin/rabbitmq-env文件,增长配置:

HOSTNAME=broker_138 若是是集群,每台机器的名称要不一样

RABBITMQ_NODE_IP_ADDRESS=192.168.100.138 绑定机器IP

3.1.3. RabbitMQ集群的配置

RabbitMQ集群的运行须要集群中的全部节点共享erlang.cookie,以其中一台RabbitMQ中用户目录下~/.erlang.cookie文件为准,复制文件内容,将全部节点的erlang.cookie文件都修改成此值。

先启动全部节点的RabbitMQ,而后依次在每台RabbitMQ中执行命令:

./rabbitmqctl stop_app
./rabbitmqctl join_cluster rabbit@broker_138
./rabbitmqctl start_app

rabbit@broker_138为其中一台RabbitMQ的实例名称,全部RabbitMQ节点都添加同一节点便可。

3.2. 使用客户端程序发送与接收消息

3.2.1. Hello World

一个简单的示例,P是生产者,C是消费者。P发送消息到队列,C从队列取消息。代码以下:

生产者:

首先创建链接,在链接上创建channel,一般一个链接会创建多个channel,能够提升消息的发送速度。这里只创建了一个链接,创建多个channel时,客户端可以使用多线程,每一个线程里使用一个channel

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
//而后声明队列,若是队列没有预先建立,会建立队列。消息以字节码的形式发送,因此在客户端可使用任何编码格式。
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
//最后别忘了关闭channel和connection
channel.close();
connection.close();

消费者:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, true, consumer);
 
while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    String message = new String(delivery.getBody());
    System.out.println(" [x] Received '" + message + "'");
}

3.2.2. 工做队列

一个队列能够有多个消费者,队列发送消息采用Round-robin方式,即顺序发送给每一个消费者,在单位时间内,每一个消费者收到的消息数量是相同的。

以上是假设每一个消费者处理消息的速度是同样的,若是每一个消费者处理消息的速度不一样,那么Round-robin方式的效率就不高了,这时能够设置prefetch参数。prefetch的值表示在消费者为返回上一条消息的确认信息时,队列最多发送给此消费者的消息数目。若是消息数目达到prefetch值,队列就中止发送消息给这个消费者,并随之发送给不忙的消费者。prefetch经过如下代码设置:

channel.basicQos(prefetchCount);

3.2.3. 订阅/发布

在上一个示例中,队列保证每条消息发送给其中一个消费者,即每一个消息只被处理一次。在实际应用中,常常会有这样的需求,每条消息要同时发送给多个消费者或者更复杂的状况。也就是说消息须要根据必定的规则发送给不一样的消费者。

为实现消息路由,须要引入Exchange,图中用X表示。生产者再也不直接发送消息给队列,而是先发送到Exchange。而后Exchange与队列绑定。这样消息会根据不一样规则发送给不一样队列,最终到达不一样的消费者。

实现代码以下:

生产者:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");
channel.close();
connection.close();

消费者:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, true, consumer);
 
while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    String message = new String(delivery.getBody());
    System.out.println(" [x] Received '" + message + "'");
}

不少时候,队列是非持久而且是自动删除的,这时队列名称也就不重要了,能够经过如下代码,由服务器自动生成队列名称。自动生成的队列以amq开头

String queueName = channel.queueDeclare().getQueue();

3.2.4. 消息路由

Exchange的类型不一样,消息的路由规则也不一样,Exchange的类型介绍请参考RabbitMQ简介。如下是以direct类型的Exchange为例的生产者代码实现, 最重要的两步就是声明Exhange类型与发送时指定routeKey

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "direct");
String severity = getSeverity(argv); //返回info,error,warning做为routeKey
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
System.out.println(" [x] Sent '" + severity + "':'" + message + "'");
channel.close();
connection.close();

3.2.5. 主题订阅

如下是以topic类型的Exchange为例的生产者代码实现:

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "topic");
String routingKey = getRouting(argv);
String message = getMessage(argv);
channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'");
connection.close();

一条以quick.orange.rabbit为routeKey的消息,Q1和Q2都会收到,lazy.orange.elephant也是。quick.orange.fox只能发送到Q1,lazy.brown.fox只能到Q2. lazy.pink.rabbit虽然符合两个匹配规则,但只发送到Q2,由于先匹配的lasy.#规则。quick.brown.fox则Q1和Q2都收不到,会被直接丢弃。

3.2.6. RPC

以上示例都是异步的,即生产者不须要等待消费者的反馈。在实际状况中,有些时候在消息处理比较快,且须要及时反馈时,则须要同步的方式,生产者发送消息,在收到消费者的反馈前一直处于阻塞状态。由于等待的返回来自远程主机,这种方式也被称为RPC(Remote procedure call)。RPC的实现有不少,好比JAVA平台下的RMI,JMX。

如下是在RabbitMQ中的实现:

RPCClient:

private Connection connection;
private Channel channel;
private String requestQueueName = "rpc_queue";
private String replyQueueName;
private QueueingConsumer consumer;
 
public RPCClient() throws Exception {
   ConnectionFactory factory = new ConnectionFactory();
   factory.setHost("localhost");
   connection = factory.newConnection();
   channel = connection.createChannel();
   replyQueueName = channel.queueDeclare().getQueue();
   consumer = new QueueingConsumer(channel);
   channel.basicConsume(replyQueueName, true, consumer);
}
 
public String call(String message) throws Exception {
   String response = null;
   String corrId = java.util.UUID.randomUUID().toString();
   BasicProperties props = new BasicProperties.Builder().correlationId(corrId).replyTo(replyQueueName).build();
   channel.basicPublish("", requestQueueName, props, message.getBytes());
 
   while (true) {
       QueueingConsumer.Delivery delivery = consumer.nextDelivery();
       if (delivery.getProperties().getCorrelationId().equals(corrId)) {
           response = new String(delivery.getBody());
           break;
       }
   }
   return response;
}
 
public void close() throws Exception {
   connection.close();
}

RPCServer:

private static final String RPC_QUEUE_NAME = "rpc_queue";
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(RPC_QUEUE_NAME, false, false, false, null);
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(RPC_QUEUE_NAME, false, consumer);
System.out.println(" [x] Awaiting RPC requests");
 
while (true) {
    QueueingConsumer.Delivery delivery = consumer.nextDelivery();
    BasicProperties props = delivery.getProperties();
    BasicProperties replyProps = new BasicProperties.Builder().correlationId(props.getCorrelationId()).build();
    String message = new String(delivery.getBody());
    int n = Integer.parseInt(message);
    System.out.println(" [.] fib(" + message + ")");
    String response = "" + fib(n);
    channel.basicPublish( "", props.getReplyTo(), replyProps, response.getBytes());
    channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}

工做流程以下:

  1. 客户端启动,创建发送队列与反馈队列

  2. 当RPC客户端发送消息时,设置replyTo和correlationId参数。replyTo参数为反馈队列的名称,correlationId做为一次请求的惟一标识,要每次请求都不一样,用于关联服务端的反馈消息

  3. 请求发送到rpc_queue

  4. 服务端等待请求,当收到请求后,处理请求,并将反馈经过replyTo指定的反馈队列发送回去

  5. 客户端收到反馈,并校验correlationId的值是否与发送的一致,若是一致,则一次请求完成

4. 消息的可靠传递

4.1. 链接失败的处理

RabbitMQ不支持链接的failover,因此须要客户端本身实现失败重连。

4.2. 服务器的可靠性

为保证消息的可靠传递,服务器使用持久化保证消息不丢失。包括exchange与queue必须定义为持久的,同时发送消息时,也要设置消息为持久消息。

在代码中能够经过如下语句设置发送持久消息:

channel.basicPublish(exchangeName, routeKey,MessageProperties.PERSISTENT_TEXT_PLAIN,msg)

或者:

BasicProperties basicProperties = new AMQP.BasicProperties.Builder().deliveryMode(2).build();
 // deliveryMode为1是非持久
channel.basicPublish(exchangeName, routeKey, basicProperties, msg)

4.3. 生产者的可靠性

生产者的消息确认叫作confirm,confirm确保消息已经发送到MQ中。当connection或channel异常时,会从新发送消息,若是消息是持久的,并不能必定保证消息持久化到磁盘中,由于消息可能存在与磁盘的缓存中。为进一步提升可靠性,可使用事务。Confirm与事务不能同时使用。

当生产者收不到confirm时,消息可能会重复,因此若是消息不容许重复,则消费者须要本身实现消息去重。

使用如下代码打开confirm,默认是关闭的

channel.confirmSelect();

4.4. 消费者的可靠性

消费者的消息确认叫作Acknowledgements,Acknowledgements确保消费者已经处理了消息,若是收不到消费者的Acknowledgements,MQ会从新发送消息。

默认Acknowledgements是自动确认,如需客户端控制,在消费者的代码中设置:

channel.basicConsume(queueName,false,consumer);//声明队列时,设置autoack为false
。。。
//消息处理代码
。。。
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); //发送确认

一样,MQ也可能收不到消费者的Acknowledgements,就会重复发送消息,若要避免,消费者须要本身实现消息去重。

4.5. 分布式的RabbitMQ

RabbitMQ提供了3中分布式的解决方案,cluster,federation,shovel。cluster用于可靠的本地局域网,后两种用于不可靠的网络。

5. 分布式

5.1. Cluster

Cluster将多台机器链接为一个逻辑broker,各机器之间使用Erlang消息通讯,因此cluster中各机器必须有同样的Erlang cookie,而且机器之间的网络要是可靠的,而且都运行相同版本的Erlang。

Virtual hosts,exchanges,用户及权限都在全部节点同步,queues能够位于本机,也能够做为镜像队列,在各个机器之间同步。

一般使用cluster来提升可靠性与增长吞吐量。

5.2. Federation

Federation容许一个exchange从另一台机器或者cluster的exchange中接收消息,由于是两个exchange联合起来,因此必须有相同的用户权限。

联合起来的exchange是单向的点对点的链接。

一般应该在经过internet链接broker的时候使用Federation

5.3. The Shovel

Shovel与Federation的概念相似,只是工做在更低的层次。

Federation是从一个exchange到另外一个exchange,而Shovel是从一边的queue中取走消息并发送到另外一个exchange。

一般在经过internet链接broker的时,而且须要得到比Federation更多控制权的时候使用Shovel。

如下是三种分布式模式的简要对比:

Federation
/ Shovel
Clustering
Brokers
are logically separate and may have different owners.
A
cluster forms a single logical broker.
Brokers
can run different versions of RabbitMQ and Erlang.
Nodes
must run the same version of RabbitMQ, and frequently Erlang.
Brokers
can run different versions of RabbitMQ and Erlang.
Brokers
must be connected via reliable LAN links. Communication is via Erlang
internode messaging, requiring a shared Erlang cookie.
Brokers
can be connected in whatever topology you arrange. Links can be one- or two-way.
All
nodes connect to all other nodes in both directions.
Brokers
can be connected in whatever topology you arrange. Links can be one- or two-way.
Chooses
Consistency and Availability from the CAP theorem.
Some
exchanges in a broker may be federated while some may be local.
Clustering
is all-or-nothing.
A
client connecting to any broker can only see queues in that broker.
A
client connecting to any node can see queues on all nodes.

6. 流量控制

6.1. 基于链接的流量控制

当生产者发送消息的速率大于消息被路由到queue的速率时,会触发流量控制,发送速率受到限制,但不会彻底阻塞。

6.2. 基于内存的流量控制

当内存使用达到vm_memory_high_watermark的值时,会触发流量控制,生产者被阻塞。vm_memory_high_watermark的默认值是系统内存的40%,这个值能够在配置文件中修改。

[{rabbit, [{vm_memory_high_watermark, 0.4}]}].

或者在运行时经过命令rabbitmqctlset_vm_memory_high_watermark fraction修改,修改当即生效,但下次重启后恢复。因此要永久修改,必须同时修改配置文件。

6.3. 基于磁盘的流量控制

当磁盘剩余空间小于disk_free_limit的值时,触发流量控制,生产者被阻塞。disk_free_limit的默认值是1GB,可在配置文件中修改。

[{rabbit, [{disk_free_limit, 25000000000}]}].

7. 内存使用

经过命令rabbitmqctl status能够查看内存使用状态,或者在WEB管理界面中点击节点后查看。

其中Queues表示队列中消息占用的内存

Mnesia表示MQ中定义的exchange,queue,bindings,用户及权限占用的内存

详细说明请参考http://www.rabbitmq.com/memory-use.html

8. 配置管理

RabbitMQ的默认配置在大部分状况下是最佳配置,若是服务运行良好,不须要修改。RabbitMQ支持3种方式修改配置:环境变量、配置文件、运行时参数与策略。

环境变量能够配置到shell环境变量中,也能够在RabbitMQ的环境变量中配置。例如:配置服务绑定IP,能够在shell环境变量里配置RABBITMQ_NODE_IP_ADDRESS的值,也能够在RabbitMQ的环境变量中配置NODE_IP_ADDRESS的值,即RabbitMQ的环境变量中变量名称要去掉RABBITMQ_。RabbitMQ的环境变量文件在$RABBITMQ_HOME/sbin/rabbitmq-env。配置的优先级为shell环境变量优先于RabbitMQ的环境变量,RabbitMQ的环境变量优先于RabbitMQ默认的环境变量。

经过配置文件配置,要先在环境变量中指定配置文件路径,例如:

CONFIG_FILE=/etc/rabbitmq/rabbitmq.config

而后添加配置,例如:

[
 
    {mnesia, [{dump_log_write_threshold, 1000}]},
 
    {rabbit, [{tcp_listeners, [5673]}]}
 
].

经过rabbitmqctl命令能够在运行时修改配置,例如修改vm_memory_high_watermark。还有些配置,好比镜像队列,是经过管理界面或命令配置策略实现的。

详细的配置项请参考http://www.rabbitmq.com/configure.html

9. 高可用性

9.1. 主从备份

RabbitMQ支持主从备份,当主服务器不可用时,存在磁盘中的消息能够由从服务器恢复。

也能够在集群的基础上配置主从备份。主从备份依赖Pacemaker来管理资源,主从备份的方式已不推荐使用,而镜像队列则更容易使用,且可靠性更高。

9.2. 镜像队列

虽然使用cluster能够提升可靠性,exchange,binding在各个机器是共享的,可是queue中的消息实际上仍是存在单独的机器,若是一台机器不可用,那么在这台机器恢复前,这台机器中存储的消息也是不可用的。

为解决这样的问题,引入了镜像队列,镜像队列是在集群中为队列创建的一个或多个物理镜像,这些镜像分别存储在主节点以外的其余节点,全部节点中的队列共同组成一个逻辑队列。将一个队列作镜像后,即便此机器不可用,RabbitMQ会自动从镜像中选择一个继续使用,不会致使队列中的消息不可用。

若是为一个队列创建多个镜像,前者称为主节点,后者称为从节点。若是主节点有问题,那么RabbitMQ会从从节点中选择最先同步的一个做为新的主节点,以保证尽可能不丢失消息,然而原主节点中同步以前的消息仍是会丢失。

镜像队列运行在cluster中,不建议经过WAN使用,也就是不建议在Federation和Shovel中使用。

镜像队列是经过策略配置的,添加一个策略,匹配相应的队列,而后指定一个key为ha-mode的参数,例如:

rabbitmqctl set_policy ha-all "^ha." '{"ha-mode":"all"}'

这个策略设置全部的节点都为ha.开头的队列作镜像。这个设置也能够在管理界面中添加,详细信息请参考http://www.rabbitmq.com/ha.html

10. 性能

10.1. 性能测试

RabbitMQ的JAVA客户端中附带了性能测试脚本,如下数据都由此脚本测试获得。

硬件环境:CPU::Intel(R) Core(TM) i5-2400 CPU @ 3.10GHz

内存:4G

磁盘:500G    10000转/分

软件环境:otp_src_R15B03-1.tar.gz

      rabbitmq-server-generic-unix-3.0.0.tar.gz (单台)

      rabbitmq-java-client-bin-3.0.0.tar.gz

       Red Hat 4.1.2-48 (Linux version 2.6.18)

如下是发送0.5KB大小消息的测试结果:

producer  consumer  confirm(max unconfirmed publishes 100)  ack  persistent  throughput (msg/s)

1 1 N     N  N      17650

1 1 Y     N      N 15640

1 1 N     Y      N 17100

1 1 N     N      Y 17368

1 1 Y     N      Y 15635

1 1 N     Y      Y 9154

1 1 Y     Y      N 15266

1 1 Y     Y      Y 6111

max unconfirmed publishes的值对于吞吐量的影响较大.

在发送持久消息与打开消费者的acknowledgements时,吞吐量变化明显。

关于性能,请参考如下文章:

http://www.rabbitmq.com/blog/2012/04/17/rabbitmq-performance-measurements-part-1/

http://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/

10.2. 队列的性能

RabbitMQ中的队列性能是一个值得关注的地方。在设计方案时就应该考虑到。队列只有在保持队列中不积压消息时,性能才是最佳的,队列中积压的消息越多,性能降低越多。

例如生产者发送消息的速度是600msg/s,消费者接收的速度是1200msg/s,正常状况下,是没有性能问题的。这时若是中止消费者一段时间,让消息在队列中积压,而后在打开消费者。按理消费者的速度大于生产者速度,能够转发新消息,并把老消息也取走,最终队列又回到为空的状态。但实际状况则不是,队列中的消息会继续积压,并且会继续变多,而这时消费者的速度就不如以前的了。

RabbitMQ中的队列,在实现上又分为多个小的队列,每一个队列里存储着不一样状态的消息。当消息不积压时,消息由交换器到达队列,就会被直接发送给消费者。而当消息堆积时,因为占用较多内存,RabbitMQ会把消息放入更深层次的队列,例如将内存中的消息换出到磁盘上(无论消息是否持久化),而这些操做会消耗更多的CPU等系统资源,从而致使影响队列中消息的发送。

为了避免使消息积压,能够采起两种方法:

  1. 中止向队列发送消息

中止发送消息,让系统资源都集中到向消费者发送消息,队列中的消息逐渐减小,队列最终会恢复至为空状态。

  1. 转移负载

有些时候不能中止生产者,这时能够改变绑定,让新消息发送到新的队列,新队列必须位于新的机器上。固然也须要新的消费者来链接。这样可让老队列中的消息慢慢取走,也不影响新消息的发送。

11. 生产实例


默认的集群模式下,虽然消息能够发送到一台机器,而后从另外一台机器取出,可是由于每台机器的queue实际上消息是本地存储,因此消息发到A的queue,从B中取,首先须要从A再次发送到B中,这样会致使取消息的效率不高。

若是使用镜像模式,A中的消息会同步到B中,消费者从B中取消息,消息是从本地取了,可是队列作镜像依然对性能影响很大,尤为是镜像的数目增长,性能会成倍降低。镜像队列优于普通模式的地方在于可靠性,普通模式中,A若是有故障,那么A中的消息就没法取出。镜像模式中,A有故障,消息依然能够从B中取出。

如下是咱们生产环境的集群配置方案,由于对于吞吐量要求很高,单台RabbitMQ没法知足性能要求,因此选择使用cluster,而镜像模式对于性能影响很大,只能采起其余方案:假设3台RabbitMQ组成一个集群。而后创建多个queue,exchange使用direct类型,并绑定全部queue,routeKey为0到2(和MQ的数量一致)中随机发送。生产者发送消息到exchange,并路由到各个queue,消费者也有多个,同时从各个queue获取消息。生产者与消费者使用多channel提升速度,同时消费者使用异步接收方式。

使用多个队列,能够显著提升集群的吞吐量,每一个队列要位于不一样的物理机器上。考虑性能优先,也取消了消息持久化。可是在可靠性方面,若是某个队列不可用,那么发送给这个队列的消息就会被丢弃。为避免这种状况,采用备用绑定与备用队列的方式,即创建多个绑定,默认状况exchange经过routeKey 0,1,2绑定队列a,b,c(橙色线路) ,备用绑定是exchange经过routeKey 0,1,2 绑定队列d(紫色线路)。好比当队列a不可用时,默认的绑定routeKey为0的消息就没法发送到a队列,这时备用策略自动生效,routeKey为0的消息会被发送到队列d上(走紫色线路),routeKey为1和2的消息照常发到b和c(仍是橙色线路)。这样就能够确保消息不丢失。若要进一步提升可靠性,下降备用队列的压力,能够创建多个备用队列,而后将绑定分散开来。

12. 相似产品对比

12.1. 功能特性对比

12.2. 性能对比

1百万条1K的消息

附录:参考资源

[1] http://www.rabbitmq.com/documentation.html

[2] http://www.infoq.com/cn/articles/AMQP-RabbitMQ#ftn.26

[3] http://langyu.iteye.com/blog/759663/

[4] http://mysql.taobao.org/index.php/Rabbitmq

[5] http://blog.163.com/clevertanglei900@126/blog/static/111352259201011121041853/

[6] http://www.rabbitmq.com/blog/2012/04/17/rabbitmq-performance-measurements-part-1/

[7] http://www.rabbitmq.com/blog/2012/04/25/rabbitmq-performance-measurements-part-2/

[8] http://www.rabbitmq.com/blog/2011/10/27/performance-of-queues-when-less-is-more/

[9] http://www.rabbitmq.com/blog/2011/09/24/sizing-your-rabbits/

[10] http://www.oschina.net/news/17973/message-queue-shootout

转自:http://changmengnan.com/284.html

相关文章
相关标签/搜索