RabbitMQ集群设计用于完成两个目标:容许消费者和生产者在RabbitMQ节点崩溃的状况下继续运行,以及经过添加更多的节点来扩展消息通讯的吞吐量。html
RabbitMQ会始终记录如下四种类型的内部元数据:java
1. 队列元数据-队列的名称和它们的属性(是否持久化,是否自动删除)node
2. 交换器元数据-交换器类型、名称和属性(可持久化等)缓存
3. 绑定元数据-一张简单的表格展现了如何将消息路由到队列安全
4. vhost元数据-为vhost内的队列、交换器和绑定提供命名空间和安全属性bash
在单一节点内,RabbitMQ会将全部这些信息存储在内存中,同时将那些标记为可持久化的队列和交换器(以及它们的绑定)存储到硬盘上。当你引入集群时,RabbitMQ须要追踪新的元数据类型:集群节点位置,以及节点与已记录的其余类型元数据的关系。集群提供了选择:将元数据存储到磁盘上,或者存储在内存中。服务器
Erlang Cookie是保证不一样节点能够相互通讯的密钥,要保证集群中的不一样节点相互通讯必须共享相同的Erlang Cookie。具体的目录存放在/var/lib/rabbitmq/.erlang.cookie。cookie
说明: 这就要从rabbitmqctl命令的工做原理提及,RabbitMQ底层是经过Erlang架构来实现的,因此rabbitmqctl会启动Erlang节点,并基于Erlang节点来使用Erlang系统链接RabbitMQ节点,在链接过程当中须要正确的Erlang Cookie和节点名称,Erlang节点经过交换Erlang Cookie以得到认证。网络
功能和原理
RabbitMQ的Cluster集群模式通常分为两种,普通模式和镜像模式。架构
普通模式:默认的集群模式,以两个节点(rabbit0一、rabbit02)为例来进行说明。对于Queue来讲,消息实体只存在于其中一个节点rabbit01(或者rabbit02),rabbit01和rabbit02两个节点仅有相同的元数据,即队列的结构。当消息进入rabbit01节点的Queue后,consumer从rabbit02节点消费时,RabbitMQ会临时在rabbit0一、rabbit02间进行消息传输,把A中的消息实体取出并通过B发送给consumer。因此consumer应尽可能链接每个节点,从中取消息。即对于同一个逻辑队列,要在多个节点创建物理Queue。不然不管consumer连rabbit01或rabbit02,出口总在rabbit01,会产生瓶颈。当rabbit01节点故障后,rabbit02节点没法取到rabbit01节点中还未消费的消息实体。若是作了消息持久化,那么得等rabbit01节点恢复,而后才可被消费;若是没有持久化的话,就会产生消息丢失的现象。
镜像模式:将须要消费的队列变为镜像队列,存在于多个节点,这样就能够实现RabbitMQ的HA高可用性。做用就是消息实体会主动在镜像节点之间实现同步,而不是像普通模式那样,在consumer消费数据时临时读取。缺点就是,集群内部的同步通信会占用大量的网络带宽。
每一个RabbitMQ节点,要么是内存节点(ram node),要么是磁盘节点(disk node)。内存节点将全部的队列、交换器、绑定、用户、权限和vhost的元数据定义都仅存在内存中。而磁盘节点则将元数据存储在磁盘中。
内存节点的效率更高,内存节点惟一存储到磁盘上的是磁盘节点的地址。
RabbitMQ要求集群中至少有一个磁盘节点。当节点加入或者离开集群时,它们必需要将该变动通知到至少一个磁盘节点。若是只有一个磁盘节点,并且不凑巧的是它又崩溃了,那么集群能够继续路由消息,可是不能作如下操做了:
1. 建立队列
2. 建立交换器
3. 建立绑定
4. 添加用户
5. 更改权限
单机环境搭建多节点群集
一、禁用管理后台插件rabbitmq-plugins disable rabbitmq_management
二、建立三个Shell文件
rabbitmq1.sh
#!/bin/bash
export RABBITMQ_NODE_PORT=5672
export RABBITMQ_NODENAME=rabbit
rabbitmq-server
rabbitmq2.sh
#!/bin/bash
export RABBITMQ_NODE_PORT=5673
export RABBITMQ_NODENAME=rabbit2
rabbitmq-server
rabbitmq3.sh
#!/bin/bash
export RABBITMQ_NODE_PORT=5674
export RABBITMQ_NODENAME=rabbit3
rabbitmq-server
三、中止在Erlang节点上运行的节点2和节点3 RabbitMQ Server 并清空(重置)它们的元数据
rabbitmqctl -n rabbit1@localhost stop_app
rabbitmqctl -n rabbit2@localhost stop_app
rabbitmqctl -n rabbit1@localhost reset
rabbitmqctl -n rabbit2@localhost reset
四、将节点2做为磁盘节点加入集群并启动应用
rabbitmqctl -n rabbit1@localhost join_cluster rabbit@localhost
rabbitmqctl -n rabbit1@localhost start_app
五、将节点3做为内存节点加入集群并启动应用
rabbitmqctl -n rabbit2@localhost join_cluster --ram rabbit@localhost
rabbitmqctl -n rabbit2@localhost start_app
六、运行命令rabbitmqctl cluster_status查看集群状态
Cluster status of node rabbit@localhost ...
[{nodes,[{disc,[rabbit1@localhost,rabbit@localhost]},
{ram,[rabbit2@localhost]}]},
{running_nodes,[rabbit2@localhost,rabbit1@localhost,rabbit@localhost]},
{cluster_name,<<"rabbit@localhost">>},
{partitions,[]},
{alarms,[{rabbit2@localhost,[]},
{rabbit1@localhost,[]},
{rabbit@localhost,[]}]}]
集群安装成功,这时候java客户端能够链接任何一个RabbitMQ Server的端口来访问集群了。
七、镜像队列
在声明队列时,能够经过参数"x-ha-policy"设置为"all"来把消息发送到集群的全部节点上。
Map arg = new HashMap();
arg.put("x-ha-policy", "all");
channel.queueDeclare(queueName, false, false, false, arg);
客户端发送代码
package com.test.cluster;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.lang.String;
import java.lang.System;
import java.util.HashMap;
import java.util.Map;
import java.util.Scanner;
public class Producer {
public static void main(String[] args) throws Exception {
//使用默认端口链接MQ
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("admin");
factory.setPassword("admin");
factory.setHost("192.168.169.142"); //使用默认端口5672
Connection conn = factory.newConnection(); //声明一个链接
Channel channel = conn.createChannel(); //声明消息通道
String exchangeName = "TestEXG";//交换机名称
String routingKey = "RouteKey1";//RoutingKey关键字
channel.exchangeDeclare(exchangeName, "direct", true);//定义声明交换机
String queueName = "ClusterQueue";//队列名称
Map arg = new HashMap();
arg.put("x-ha-policy", "all");
channel.queueDeclare(queueName, false, false, false, arg);
channel.queueBind(queueName, exchangeName, routingKey);//定义声明对象
byte[] messageBodyBytes = "Hello, world!".getBytes();//消息内容
channel.basicPublish(exchangeName, routingKey, null, messageBodyBytes);//发布消息
//关闭通道和链接
channel.close();
conn.close();
}
}
消费者代码
package com.test.cluster;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
//经过channel.basicAck向服务器发送回执,删除服务上的消息
public class Consumer {
public static void main(String[] args) throws IOException, InterruptedException {
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("admin");
factory.setPassword("admin");
factory.setHost("192.168.169.142"); //使用默认端口5672
factory.setPort(5672);
Connection conn = factory.newConnection(); //声明一个链接
Channel channel = conn.createChannel(); //声明消息通道
String exchangeName = "TestEXG";//交换机名称
String queueName = "ClusterQueue";//队列名称
channel.exchangeDeclare(exchangeName, "direct", true);//定义声明交换机
channel.queueBind(queueName, exchangeName, "RouteKey1");
channel.basicQos(1); //server push消息时的队列长度
//用来缓存服务器推送过来的消息
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(queueName, false, consumer);
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
System.out.println("Received " + new String(delivery.getBody()));
//回复ack包,若是不回复,消息不会在服务器删除
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}
}