快速掌握RabbitMQ(五)——搭建高可用的RabbitMQ集群

  RabbitMQ的集群是依赖erlang集群的,而erlang集群是经过.erlang.cookie文件进行通讯认证的,因此咱们使用RabbitMQ集群时只须要配置一下.erlang.cookie文件便可。下边简单演示一下RabbitMQ高可用集群的搭建,附带一个简单使用C#驱动RabbtiMQ集群的小栗子。html

1 搭建RabbitMQ高可用集群

  首先准备三台设备,这里采用的三台Centos7的虚拟机,测试一下各个虚拟机能不能相互ping通,若是能够相互ping通的话,在每台虚拟机上分别安装RabbitMQ,能够参考第一篇的安装方法node

第1步 修改主机配置

   为了方便机器间的相互访问,三台centos都执行  vim /etc/hosts ,添加下边的配置(注意修改为本身设备的IP):redis

192.168.70.129 rabbitmq1 192.168.70.131 rabbitmq2 192.168.70.133 rabbitmq3

  通常状况,hosts文件中内容以下:vim

第2步:修改.erlang.cookie文件

  修改三台设备的.erlang.cookie中的key一致。若是使用的是前边的安装方法,.erlang.cookie的位置为 /var/lib/rabbitmq/.erlang.cookie (采用其余安装方式找不到文件的话,可使用命令  find / -name '.erlang.cookie' 找到文件位置)。这里三台虚拟机的key都采用192.168.70.129的key(值为CRRQPKHDXEEIUJUOGYKN),在另外两台设备上 执行命令: vim /var/lib/rabbitmq/.erlang.cookie ,修改文件内容为CRRQPKHDXEEIUJUOGYKN。在修改时若是遇到权限问题,可执行命令 chmod 600 /var/lib/rabbitmq/.erlang.cookie 修改文件的权限为可写,修改内容完成后,执行命令 chmod 400 /var/lib/rabbitmq/.erlang.cookie 把文件再次改为只读的。centos

  完成上边的两步后,erlang集群就搭建好了,重启全部的设备便可。在rabbitmq1节点的虚拟机上执行命令 rabbitmqctl cluster_status 查看集群状态:数组

 第3步:添加/删除节点

添加节点bash

  把rabbitmq2节点添加到集群中去,在rabbitmq2节点执行如下命令:cookie

rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl join_cluster rabbit@rabbit1 rabbitmqctl start_app

  执行完成后,查看集群状态,看到rabbitmq2已经在集群中了,以下:app

  重复上边的步骤,把rabbitmq3也添加到集群中,在rabbitmq3节点执行下边命令:
rabbitmqctl stop_app rabbitmqctl reset rabbitmqctl join_cluster rabbit@rabbit2 rabbitmqctl start_app
  查看集群状态,rabbitmq3也在集群中了,以下:  

  这时咱们打开任意一个节点的Web管理界面,显示以下,看到集群已经配置完成了:

删除节点
  把某一节点从集群中删除很简单,reset一下节点便可。如删除rabbitmq3节点,在rabbitmq3上执行如下命令:
rabbitmqctl stop_app rabbitmqctl reset
rabbitmqctl start_app

  执行完成后,查看集群状态以下:负载均衡

   如今搭建的集群是默认的普通集群,普通集群中节点能够共享集群中的exchange,routingKey和queue,可是queue中的消息只保存在首次声明queue的节点中。任意节点的消费者均可以消费其余节点的消息,好比消费者链接rabbitmq1节点的消费者(代码中创建Connection时,使用的rabbitmq1的IP)能够消费节点rabbitmq2的队列myqueue2中的消息,消息传输过程是:rabbitmq2把myqueue2中的消息传输给rabbtimq1,而后rabbitmq1节点把消息发送给consumer。由于queue中的消息只保存在首次声明queue的节点中,这样就有一个问题:若是某一个node节点挂掉了,那么只能等待该节点从新链接才能继续处理该节点内的消息(若是没有设置持久化的话,节点挂掉后消息会直接丢失)。以下图,rabbitmq1节点挂掉后,myqueue队列就down掉了,不能被访问。

  针对上边的问题,咱们可能会想到:若是可让rabbitmq中的节点像redis集群的节点同样,每个节点都保存全部的消息,好比让rabbitmq1不只仅保存本身队列myqueue的消息,还保存其余节点的队列myqueue2和myqueue3中的消息,rabbitmq2和rabbitmq3节点也同样,这样就不用担忧宕机的问题了。rabbitmq也提供了这样的功能:镜像队列。镜像队列由一个master和多个slave组成,使用镜像队列消息会自动在镜像节点间同步,而不是在consumer取数据时临时拉取。

第4步:配置镜像队列

  rabbitmq配置镜像队列十分简单,咱们在任意一个node节点下执行下边的命令就能够完成镜像队列的配置(固然也能够在Web管理界面上添加policy):

rabbitmqctl set_policy ha-all "^my" '{"ha-mode":"all","ha-sync-mode":"automatic"}' # ha-all:为策略名称; # ^my:为匹配符,只有一个^表明匹配全部,^abc为匹配名称以abc开头的queue或exchange; # ha-mode:为同步模式,一共3种模式: #    ①all-全部(全部的节点都同步消息), #    ②exctly-指定节点的数目(需配置ha-params参数,此参数为int类型好比2,在集群中随机抽取2个节点同步消息) #    ③nodes-指定具体节点(需配置ha-params参数,此参数为数组类型好比["rabbit@rabbitmq1","rabbit@rabbitmq2"],明确指定在这两个节点上同步消息)。

  打开Web管理界面,若是效果以下表示镜像队列已经配置完成了。当前myqueue的master节点为rabbitmq1:

  若是首次声明queue的节点(master)挂了,其余节点会自动变成master,如上图myqueue的master为rabbitmq1,停掉rabbtmq1后,结果以下:rabbitmq2成为了master。

 

  咱们发现rabbitmq1节点挂了后,rabbitmq2自动成为了myqueue的master,myqueue不会down掉,能够正常的添加/删除/获取消息,这就解决了普通集群宕机的问题。使用镜像队列,由于各个节点要同步消息,因此比较耗费资源,通常在可靠性比较高的场景使用镜像队列。

还能够配置其余策略的镜像队列,也是一行命令便可完成配置,一些其它同步模式的栗子:

#策略名为ha-twe,匹配以“my”开头的queue或exchange,在集群中随机挑选镜像节点,同步的节点为2个
rabbitmqctl set_policy ha-two "^my"    '{"ha-mode":"exactly","ha-params":2,"ha-sync-mode":"automatic"}' #策略名为ha-nodes,匹配以“my”开头的queue或exchange,指定rabbit@rabbitmq2和rabbit@rabbitmq3为同步节点
rabbitmqctl set_policy ha-nodes "^my" \ '{"ha-mode":"nodes","ha-params":["rabbit@rabbitmq2", "rabbit@rabbitmq3"]}'

第5步:C#驱动RabbitMQ集群

  C#驱动RabbitMQ集群与C#驱动单机RabbtiMQ的方式基本同样,区别在于使用集群时,建立Connection指定的是一个host集合。看一个简单的栗子:

生产者代码:

static void Main(string[] args) { var factory = new ConnectionFactory() { UserName = "wyy",//用户名
                Password = "123456",//密码
                AutomaticRecoveryEnabled = true,//Connection断了,自动从新链接
 }; //集群中的三个rabbitmq节点
            List<string> hosts = new List<string>() { "192.168.70.129", "192.168.70.131", "192.168.70.1233" }; //随机链接一个rabbitmq节点
            using (var connection = factory.CreateConnection(hosts)) { //建立通道channel
                using (var channel = connection.CreateModel()) { Console.WriteLine("生产者准备就绪...."); #region 发布100条消息
                    for (int i = 0; i < 100; i++) { channel.BasicPublish(exchange: "myexchange", routingKey: "mykey", basicProperties: null, body: Encoding.UTF8.GetBytes($"第{i}条消息")); } #endregion } } Console.ReadKey(); }

消费者代码:

static void Main(string[] args) { var factory = new ConnectionFactory() { UserName = "wyy",//用户名
                Password = "123456",//密码
 }; //集群中的三个rabbitmq节点
            List<string> hosts = new List<string>() { "192.168.70.129", "192.168.70.131", "192.168.70.1233" }; //随机链接一个rabbitmq节点
            using (var connection = factory.CreateConnection(hosts)) { using (var channel = connection.CreateModel()) { //使用Qos,每次接收1条消息
                    channel.BasicQos(prefetchSize: 0, prefetchCount: 2, global: false); Console.WriteLine("消费者准备就绪...."); #region EventingBasicConsumer

                    //定义消费者 
                    var consumer = new EventingBasicConsumer(channel); consumer.Received += (sender, ea) => { //处理一条消息须要10s时间
                        Thread.Sleep(1000); //显示确认消息
                        channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false); Console.WriteLine($"处理消息【{Encoding.UTF8.GetString(ea.Body)}】完成!"); }; //处理消息
                    channel.BasicConsume(queue: "myqueue", autoAck: false, consumer: consumer); Console.ReadKey(); #endregion } } }

   执行这两个应用程序,结果以下:

 

   到这里RabbtMQ的集群搭建就告一段落了,有一个小问题:RabbitMQ的集群默认不支持负载均衡的。咱们能够根据设备的性能,使用Qos给各个消费者指定合适的最大发送条数,这样能够在必定程度上实现负载均衡。也有园友经过Haproxy实现RabbitMQ集群的负载均衡,有兴趣的小伙伴能够研究一下,为何使用Haprpxy而不用Ngnix呢?这是由于Haproxy支持四层(tcp,udp等)和七层(http,https,email等)的负载均衡,而Nginx只支持七层的负载均衡,而Rabbitmq是经过tcp传输的。本节也是RabbitMQ系列的最后一篇,若是文中有错误的话,但愿你们能够指出,我会及时修改,谢谢。

 

  

原文出处:https://www.cnblogs.com/wyy1234/p/10889742.html

相关文章
相关标签/搜索