原理:node
经过链接多个机器组成单个逻辑中间服务器,机器之间通讯借助于erlang的消息传输,要求集群中全部节点必须有相同的erlang cookie;节点间网络必须可靠,且运行相同版本的rabbitmq和erlang。python
虚拟主机、交换机、用户信息和权限会自动镜像到集群中全部节点。队列可能位于单个节点或镜像到多个节点。链接到任意节点的客户端能看到集群中全部队列。git
安装erlanggithub
wget https://github.com/rabbitmq/erlang-rpm/releases/download/v21.0.2/erlang-21.0.2-1.el6.x86_64.rpmweb
wget https://github.com/rabbitmq/erlang-rpm/releases/download/v21.0.2/erlang-21.0.2-1.el7.centos.x86_64.rpmcentos
rpm -ivh erlang-20.3.6-1.el6.x86_64.rpm服务器
安装mqcookie
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.6/rabbitmq-server-3.7.6-1.el6.noarch.rpm网络
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.7/rabbitmq-server-3.7.7-1.el7.noarch.rpmapp
yum install rabbitmq-server-3.7.7-1.el7.noarch.rpm
启动mq
systemctl start rabbitmq-server
新建用户
rabbitmqctl add_user admin Hilaw@890
rabbitmqctl set_user_tags admin administrator
rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'
rabbitmqctl add_user monitor law@201809
rabbitmqctl set_user_tags monitor administrator
rabbitmqctl set_permissions -p / monitor '.*' '.*' '.*'
开启web管理页面:
rabbitmq-plugins enable rabbitmq_management
http://IP:15672 便可用admin登陆查看
关闭web管理页面:
rabbitmq-plugins disable rabbitmq_management
经常使用命令
rabbitmqctl list_users 查看用户列表
rabbitmqctl add_user username password 新建一个用户
rabbitmqctl delete_user username 删除一个用户
rabbitmqctl change_password username newpassword 修改用户密码
rabbitmqctl set_permissions -p vhost username ".*" ".*" ".*"
配置用户对应虚拟主机配置、写和读的全部权限
rabbitmqctl set_permissions -p username 查看用户权限
rabbitmqctl clear_permissions -p username vhost
移除用户在虚拟主机上的全部权限
rabbitmqctl list_user_permissions username 查看用户在全部虚拟主机上的权限
rabbitmqctl status 查看状态
rabbitmqctl list_queues 查看全部队列及其消息数
rabbitmqctl list_queues -p vhost 查看具体队列
rabbitmqctl -p vhostpath purge_queue blue 清除队列blue里的消息
镜像集群搭建
1,MQ1上同步erlang.cookie
cd /var/lib/rabbitmq
ls -al
chmod 777 .erlang.cookie
scp .erlang.cookie root@MQ2:/var/lib/rabbitmq
chmod 400 .erlang.cookie
2,在MQ2上
MQ2上暂停服务
systemctl stop rabbitmq-server
cd /var/lib/rabbitmq
ls -al
cp .erlang.cookie .erlang.cookie.default
chown rabbitmq.rabbitmq .erlang.cookie
chmod 400 .erlang.cookie
systemctl start rabbitmq-server
加入集群
MQ1查看集群状态
rabbitmqctl cluster_status
将MQ2加入MQ1
关掉MQ2的队列
rabbitmqctl stop_app
rabbitmqctl join_cluster rabbit@MQ1 MQ2上必须能ping同MQ1,添加全部节点IP HOSTNAME到/etc/hosts文件
rabbitmqctl start_app
查看集群状态
rabbitmqctl cluster_status
退出集群
MQ1上
rabbitmqctl forget_cluster_note rabbit@MQ2
MQ2上
rabbitmqctl stop_app
rabbitmqctl reset
rabbitmqctl start_app
注意事项:
cookie在全部节点上必须彻底同样,同步时必定要注意。
erlang是经过主机名来链接服务,必须保证各个主机名之间能够ping通。能够经过编辑/etc/hosts来手工添加主机名和IP对应关系。若是主机名ping不通,rabbitmq服务启动会失败。
若是queue是非持久化queue,则若是建立queue的那个节点失败,发送方和接收方能够建立一样的queue继续运做。但若是是持久化queue,则只能等建立queue的那个节点恢复后才能继续服务。
在集群元数据有变更的时候须要有disk node在线,可是在节点加入或退出的时候全部的disk node必须所有在线。若是没有正确退出disk node,集群会认为这个节点当掉了,在这个节点恢复以前不要加入其它节点。
更改节点属性
rabbitmqctl stop_app –中止rabbitmq服务
rabbitmqctl change_cluster_node_type disc/ram –更改节点为磁盘或内存节点
rabbitmqctl start_app –开启rabbitmq服务
内存节点是将交换器、用户和vhost等元数据都保存在内存中,而磁盘节点将元数据保存在磁盘中。单节点系统只容许磁盘节点,集群中可部分配置为内存节点。
集群重启时,最后一个挂掉的节点应该第一个重启,若是因特殊缘由(好比同时断电),而不知道哪一个节点最后一个挂掉。可用如下方法重启:
先在一个节点上执行
#rabbitmqctl force_boot
#service rabbitmq-server start
在其余节点上执行
#service rabbitmq-server start
查看cluster状态是否正常(要在全部节点上查询)。
#rabbitmqctl cluster_status.
测试:
1,生成消息
#!/usr/bin/python3
import pika
credentials = pika.PlainCredentials('monitor','law@201809')
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1','5672','/',credentials))
channel = connection.channel() #新建一个频道
channel.queue_declare(queue='pika') #声明一个队列pika
channel.basic_publish(exchange='',routing_key='pika',body='Test message') #向pika队列发送一条消息
connection.close() #发送完成后,关闭链接
2,消费
#!/usr/bin/python3
import pika
credentials = pika.PlainCredentials('monitor','law@201809')
connection = pika.BlockingConnection(pika.ConnectionParameters('127.0.0.1','5672','/',credentials))
channel = connection.channel()
channel.queue_declare(queue='pika')
for method_frame,properties,body in channel.consume('pika'):
print(method_frame,properties,body) #打印出消息
channel.basic_ack(method_frame.delivery_tag)
#消费完消息就跳出循环
if method_frame.delivery_tag == 1:
break
#取消消费者并返回任何待处理消息
requeued_messages = channel.cancel()
print('Requeued %i messages' % requeued_messages)
connection.close() #关闭链接