在项目中想要 RabbitMQ 变得更加健壮,就要使得其变成高可用,今天咱们一块儿来聊聊关于 RabbitMQ 集群原理和部署流程node
1、介绍
在前几篇文章中,咱们详细的介绍了 RabbitMQ 的内部结构和使用,以及 SpringBoot 和 RabbitMQ 整合,都是基于单台 RabbitMQ 进行使用的。c++
咱们知道在微服务流行的当下,一旦单台服务器挂了,基本上就没法提供高可用的服务了,所以为了保证服务高可用,在生产环境上咱们一般的作法是搭建一个 RabbitMQ 集群,即便某台 RabbitMQ 故障了,其余正常的 RabbitMQ 服务器依然可使用,应用程序的持续运行不会受到影响。web
2、集群架构原理
在前几篇文章中,咱们有介绍到 RabbitMQ 内部有各类基础构件,包括队列、交换器、绑定、虚拟主机等,他们组成了 AMQP 协议消息通讯的基础,而这些构件以元数据的形式存在,它始终记录在 RabbitMQ 内部,它们分别是:正则表达式
-
队列元数据:队列名称和它们的属性 -
交换器元数据:交换器名称、类型和属性 -
绑定元数据:一张简单的表格展现了如何将消息路由到队列 -
vhost 元数据:为 vhost 内的队列、交换器和绑定提供命名空间和安全属性
这些元数据,其实本质是一张查询表,里面包括了交换器名称和一个队列的绑定列表,当你将消息发布到交换器中,其实是将你所在的信道将消息上的路由键与交换器的绑定列表进行匹配,而后将消息路由出去。算法

有了这个机制,那么在全部节点上传递交换器消息将简单不少,而 RabbitMQ 所作的事情就是把交换器元数据拷贝到全部节点上,所以每一个节点上的每条信道均可以访问完整的交换器。spring

若是消息生产者所链接的是节点 2 或者节点 3,此时队列1的完整数据不在该两个节点上,那么在发送消息过程当中这两个节点主要起了一个路由转发做用,根据这两个节点上的元数据转发至节点1上,最终发送的消息仍是会存储至节点1的队列1上。sql
一样,若是消息消费者所链接的节点2或者节点3,那这两个节点也会做为路由节点起到转发做用,将会从节点1的队列1中拉取消息进行消费。json
与常见的集群主从架构模式不一样的地方在于:RabbitMQ 集群模式下,仅仅只是同步元数据,每一个队列内容仍是在本身的服务器节点上。vim
这么设计主要仍是基于集群自己的性能和存储空间上来考虑:后端
-
存储空间:真正存放数据的地方是在队列里面,若是每一个集群节点都拥有全部队列的彻底数据拷贝,那么每一个节点的存储空间会很是大,集群的消息积压能力会很是弱。例如你如今存储了 3G 队列内容,那么在另一个只有 1G 存储空间的节点上,就会形成内存空间不足的状况,也就是没法经过集群节点的扩容提升消息积压能力。 -
性能:消息的发布者须要将消息复制到每个集群节点,每一条消息都会触发磁盘活动,这会致使整个集群内性能负载急剧拉升。
既然每一个队列内容仍是在本身的服务器节点上,一样也会带来新的问题,那就是若是队列所在服务器挂了,那存在服务器上的队列数据是否是所有都丢失了?
在单个节点上,RabbitMQ 存储数据有两种方案:
-
内存模式:这种模式会将数据存储在内存当中,若是服务器忽然宕机重启以后,那么附加在该节点上的队列和其关联的绑定都会丢失,而且消费者能够从新链接集群并从新建立队列; -
磁盘模式:这种模式会将数据存储磁盘当中,若是服务器忽然宕机重启,数据会自动恢复,该队列又能够进行传输数据了,而且在恢复故障磁盘节点以前,不能在其它节点上让消费者从新连到集群并从新建立队列,若是消费者继续在其它节点上声明该队列,会获得一个 404 NOT_FOUND 错误,这样确保了当故障节点恢复后加入集群,该节点上的队列消息不会丢失,也避免了队列会在一个节点以上出现冗余的问题。
在集群中的每一个节点,要么是内存节点,要么是磁盘节点,若是是内存节点,会将全部的元数据信息仅存储到内存中,而磁盘节点则不只会将全部元数据存储到内存上, 还会将其持久化到磁盘。
在单节点 RabbitMQ 上,仅容许该节点是磁盘节点,这样确保了节点发生故障或重启节点以后,全部关于系统的配置与元数据信息都会从磁盘上恢复。
而在 RabbitMQ 集群上,至少有一个磁盘节点,也就是在集群环境中须要添加 2 台及以上的磁盘节点,这样其中一台发生故障了,集群仍然能够保持运行。其它节点均设置为内存节点,这样会让队列和交换器声明之类的操做会更加快速,元数据同步也会更加高效。
3、集群部署
为了和生产环境保持一致,咱们选用CentOS7
操做系统进行环境部署,分别建立 3 台虚拟机。
# 3台服务器的IP
197.168.24.206
197.168.24.233
197.168.24.234
放开防火墙限制,保证 3 台服务器网络均可以互通!
3.一、从新设置主机名
因为 RabbitMQ 集群链接是经过主机名来链接服务的,必须保证各个主机名之间能够 ping 通,从新设置 3 台服务器主机名,因此须要作如下操做:
# 修改节点1的主机名
hostname node1
# 修改节点2的主机名
hostname node2
# 修改节点3的主机名
hostname node3
编辑/etc/hosts
文件,添加到在三台机器的/etc/hosts
中如下内容:
sudo vim /etc/hosts
添加内容以下:
197.168.24.206 node1
197.168.24.233 node2
197.168.24.234 node3
3.二、rabbitMQ安装
RabbitMQ 基于 erlang 进行通讯,相比其它的软件,安装有些麻烦,不过本例采用rpm
方式安装,任何新手均可以完成安装,过程以下!
3.2.一、安装前命令准备
输入以下命令,完成安装前的环境准备。
yum install lsof build-essential openssl openssl-devel unixODBC unixODBC-devel make gcc gcc-c++ kernel-devel m4 ncurses-devel tk tc xz wget vim
3.2.二、下载 RabbitMQ、erlang、socat 的安装包
本次下载的是RabbitMQ-3.6.5
版本,采用rpm
一键安装,适合新手直接上手。
先建立一个rabbitmq
目录,本例的目录路径为/usr/app/rabbitmq
,而后在目录下执行以下命令,下载安装包!
-
下载erlang
wget www.rabbitmq.com/releases/erlang/erlang-18.3-1.el7.centos.x86_64.rpm
-
下载socat
wget http://repo.iotti.biz/CentOS/7/x86_64/socat-1.7.3.2-5.el7.lux.x86_64.rpm
-
下载rabbitMQ
wget www.rabbitmq.com/releases/rabbitmq-server/v3.6.5/rabbitmq-server-3.6.5-1.noarch.rpm
最终目录文件以下:

3.2.三、安装软件包
下载完以后,按顺序依次安装软件包,这个很重要哦~
-
安装erlang
rpm -ivh erlang-18.3-1.el7.centos.x86_64.rpm
-
安装socat
rpm -ivh socat-1.7.3.2-5.el7.lux.x86_64.rpm
-
安装rabbitmq
rpm -ivh rabbitmq-server-3.6.5-1.noarch.rpm
安装完成以后,修改rabbitmq
的配置,默认配置文件在/usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin
目录下。
vim /usr/lib/rabbitmq/lib/rabbitmq_server-3.6.5/ebin/rabbit.app
修改loopback_users
节点的值!

分别从新命令rabbit
节点名称
vim /etc/rabbitmq/rabbitmq-env.conf
在文件里添加一行,以下配置!
NODENAME=rabbit@node1
其它两个节点命令也相似,而后,再保存!经过以下命令,启动服务便可!
# 启动服务
rabbitmq-server start &
# 中止服务
rabbitmqctl stop
经过以下命令,查询服务是否启动成功!
lsof -i:5672
若是出现5672
已经被监听,说明已经启动成功!

3.2.四、启动可视化的管控台
输入以下命令,启动控制台!
rabbitmq-plugins enable rabbitmq_management
用浏览器打开http://ip:15672
,这里的ip
就是 CentOS 系统的 ip,结果以下:

帐号、密码,默认为guest
,若是出现没法访问,检测防火墙是否开启,若是开启将其关闭便可!
登陆以后的监控平台,界面以下:

3.三、复制 Erlang cookie
RabbitMQ 集群环境下,元数据同步基于 cookie 共享方案实现。
在这里将 node1 的 cookie 文件复制到 node2,因为这个文件权限是 400 为方便传输,先修改权限,非必须操做,因此须要先修改 node1 中的该文件权限为 777
chmod 777 /var/lib/rabbitmq/.erlang.cookie
用 scp 拷贝到节点 2,节点 3 的操做也相似。
scp /var/lib/rabbitmq/.erlang.cookie node2:/var/lib/rabbitmq/
最后,将权限改回来
chmod 400 /var/lib/rabbitmq/.erlang.cookie
3.四、组成集群
在节点 2 执行以下命令:
# 中止rabbitmq服务
rabbitmqctl stop_app
# 清空节点状态
rabbitmqctl reset
# node2和node1构成集群,node2必须能经过node1的主机名ping通
rabbitmqctl join_cluster rabbit@node1
# 开启rabbitmq服务
rabbitmqctl start_app
节点 3 的操做也相似!
在任意一台机上面查看集群状态:
rabbitmqctl cluster_status

-
第一行:表示当前节点信息 -
第二行:表示集群中的节点成员,disc 表示这些都是磁盘节点 -
第三行:表示正在运行的节点成员
登陆可视化管控台,能够很清晰的看到,三个服务节点已经互相关联起来了!

若是你想将某个节点移除集群,以移除节点3为例,能够按照以下方式进行操做!
# 首先中止要移除的节点服务
rabbitmqctl stop
# 移除节点3
rabbitmqctl -n rabbit@node1 forget_cluster_node rabbit@node3
若是移除以后,没法启动 rabbitMQ,删除已有 mnesia 信息!
rm -rf /var/lib/rabbitmq/mnesia
而后再次重启服务便可!
3.五、设置内存节点
#加入时候设置节点为内存节点(默认加入的为磁盘节点)
rabbitmqctl join_cluster rabbit@node1 --ram
其中--ram
指的是做为内存节点,若是不加,那就默认为磁盘节点。
若是节点在集群中已是磁盘节点了,经过如下命令能够将节点改为内存节点:
# 中止rabbitmq服务
rabbitmqctl stop_app
# 更改节点为内存节点
rabbitmqctl change_cluster_node_type ram
# 开启rabbitmq服务
rabbitmqctl start_app

3.六、镜像队列
上面咱们提到,在默认状况下,队列只会保存在其中一个节点上,当节点发生故障时,尽管全部元数据信息均可以从磁盘节点上将元数据恢复到本节点上,可是内存节点的队列消息内容就不行了,这样就会致使消息的丢失。
RabbitMQ 很早就意识到这个问题,在 2.6 之后的版本中增长了队列冗余选项:镜像队列。
所谓镜像队列,其实就是主队列(master)依然是仅存在于一个节点上,经过关联的 rabbitMQ 服务器,从主队列同步消息到各个节点,也就是所谓的主从模式,将主队列的消息进行备份处理。
若是主队列没有发生故障,那么其工做流程跟普通队列同样,生产者和消费者不会感知其变化,当发布消息时,依然是路由到主队列中,而主队列经过相似广播的机制,将消息扩散同步至其他从队列中,这就有点像 fanout 交换器同样。而消费者依然是从主队列中读取消息。
一旦主队列发生故障,集群就会从最老的一个从队列选举为新的主队列,这也就实现了队列的高可用了,但咱们切记不要滥用这个机制,在上面也说了,队列的冗余操做会致使不能经过扩展节点增长存储空间,并且会形成性能瓶颈。
命令格式以下:
rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority]
参数介绍:
-p Vhost: 可选参数,针对指定vhost下的queue进行设置
Name: policy的名称
Pattern: queue的匹配模式(正则表达式)
Definition: 镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode
ha-mode: 指明镜像队列的模式,有效值为 all/exactly/nodes
all: 表示在集群中全部的节点上进行镜像
exactly: 表示在指定个数的节点上进行镜像,节点的个数由ha-params指定
nodes: 表示在指定的节点上进行镜像,节点名称经过ha-params指定
ha-params: ha-mode模式须要用到的参数
ha-sync-mode: 进行队列中消息的同步方式,有效值为automatic和manual
priority: 可选参数,policy的优先级
举个例子,声明名为ha-all
的策略,它与名称以ha
开头的队列相匹配,并将镜像配置到集群中的全部节点:
rabbitmqctl set_policy ha-all "^" '{"ha-mode":"all"}'
相似操做不少,具体使用能够参考官方 api。
4、集群的负载均衡
HAProxy 提供高可用性、负载均衡以及基于TCP和HTTP应用的代理,支持虚拟主机,它是免费、快速而且可靠的一种解决方案。根据官方数据,其最高极限支持10G的并发。HAProxy支持从4层至7层的网络交换,即覆盖全部的 TCP 协议。就是说,Haproxy 甚至还支持 Mysql 的均衡负载。为了实现 RabbitMQ 集群的软负载均衡,这里能够选择HAProxy。

4.一、HAProxy 安装
HAProxy 的安装也很简单,单独部署在一台服务器上,经过以下命令便可安装完成!
yum install haproxy
编辑 HAProxy 配置文件:
vim /etc/haproxy/haproxy.cfg
咱们只须要在文件末尾加上以下配置便可!
#绑定配置
listen rabbitmq_cluster
bind 0.0.0.0:5672
#配置TCP模式
mode tcp
#加权轮询
balance roundrobin
#RabbitMQ集群节点配置
server rmq_node1 197.168.24.206:5672 check inter 5000 rise 2 fall 3 weight 1
server rmq_node2 197.168.24.233:5672 check inter 5000 rise 2 fall 3 weight 1
server rmq_node3 197.168.24.234:5672 check inter 5000 rise 2 fall 3 weight 1
#haproxy监控页面地址
listen monitor
bind 0.0.0.0:8100
mode http
option httplog
stats enable
stats uri /stats
stats refresh 5s
绑定配置参数说明:
-
bind
:这里定义了客户端链接链接 IP 地址和端口号,用于客户端链接 -
balance roundrobin
:表示加权轮询负载均衡算法
RabbitMQ 集群节点配置说明:
-
server rmq_node1
:定义HAProxy内RabbitMQ服务的标识 -
197.168.24.206:5672
:标识了后端RabbitMQ的服务地址 -
check inter 5000
:表示每隔多少毫秒检查RabbitMQ服务是否可用,示例参数值为 5000 -
rise 2
:表示 RabbitMQ 服务在发生故障以后,须要多少次健康检查才能被再次确承认用,示例参数值为 2 -
fall 2
:表示须要经历多少次失败的健康检查以后,HAProxy 才会中止使用此RabbitMQ服务,示例参数值为 2 -
weight 1
:表示权重比例,值越低,会优先进行数据分配,示例参数值为 1
启动 HAProxy:
/usr/sbin/haproxy -f /etc/haproxy/haproxy.cfg
登陆http://ip:8100/stats
web 管理界面,便可进行监控查看!

5、Java 客户端使用
若是是配置了 HAProxy 代理服务器,能够直接使用 HAProxy 代理服务器地址便可!
//ConnectionFactory建立MQ的物理链接
connectionFactory = new ConnectionFactory();
connectionFactory.setHost("197.168.24.207"); //代理服务器地址
connectionFactory.setPort(5672); //代理服务器端口
connectionFactory.setUsername("admin"); //guest只能在本机进行访问,经过代理服务器发送消息时须要从新创建用户
connectionFactory.setPassword("admin"); //guest
connectionFactory.setVirtualHost("/"); //虚拟主机
若是没有代理服务器,使用Spring
的CachingConnectionFactory
类进行配置。
以SpringBoot
项目为例,配置文件以下:
spring.rabbitmq.addresses=197.168.24.206:5672,197.168.24.233:5672,197.168.24.234:5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
RabbitConfig
配置类以下:
@Configuration
public class RabbitConfig {
/**
* 初始化链接工厂
* @param addresses
* @param userName
* @param password
* @param vhost
* @return
*/
@Bean
ConnectionFactory connectionFactory(@Value("${spring.rabbitmq.addresses}") String addresses,
@Value("${spring.rabbitmq.username}") String userName,
@Value("${spring.rabbitmq.password}") String password,
@Value("${spring.rabbitmq.virtual-host}") String vhost) {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setAddresses(addresses);
connectionFactory.setUsername(userName);
connectionFactory.setPassword(password);
connectionFactory.setVirtualHost(vhost);
return connectionFactory;
}
/**
* 从新实例化 RabbitAdmin 操做类
* @param connectionFactory
* @return
*/
@Bean
public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory){
return new RabbitAdmin(connectionFactory);
}
/**
* 从新实例化 RabbitTemplate 操做类
* @param connectionFactory
* @return
*/
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory){
RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
//数据转换为json存入消息队列
rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
return rabbitTemplate;
}
}
6、总结
本文主要详细介绍了 RabbitMQ 集群的工做原理和如何搭建一个具有负载均衡能力的 RabbitMQ 集群的方法。
限于笔者的才疏学浅,对本文内容可能还有理解不到位的地方,若有阐述不合理之处还望留言一块儿探讨。
7、参考
一、简书 - 癫狂侠 - 消息中间件—RabbitMQ
二、后端进阶 - zch - RabbitMQ集群原理与部署
< END >
如果大家喜欢我们的文章,欢迎大家转发,点击在看让更多的人看到。也欢迎大家热爱技术和学习的朋友加入的我们的知识星球当中,我们共同成长,进步。
【推荐阅读】
本文分享自微信公众号 - Java极客技术(Javageektech)。
若有侵权,请联系 support@oschina.cn 删除。
本文参与“OSC源创计划”,欢迎正在阅读的你也加入,一块儿分享。