RabbitMQ集群、镜像部署配置

1   RABBITMQ简介及安装

 

RabbitMQ是一个开源的AMQP实现,服务器端用Erlang语言编写,支持多种客户端,如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX。用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。node

AMQP,即Advanced message Queuing Protocol,高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。消息中间件主要用于组件之间的解耦,消息的发送者无需知道消息使用者的存在,反之亦然。python

AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。web

1   RABBITMQ系统架构

1

RabbitMQ Server: 也叫broker server,是一种传输服务,负责维护一条从Producer到consumer的路线,保证数据可以按照指定的方式进行传输。正则表达式

Producer,数据的发送方。缓存

Consumer,数据的接收方。安全

Exchanges 接收消息,转发消息到绑定的队列。主要使用3种类型:direct, topic, fanout。bash

Queue RabbitMQ内部存储消息的对象。相同属性的queue能够重复定义,但只有第一次定义的有效。服务器

Bindings 绑定Exchanges和Queue之间的路由。cookie

Connection: 就是一个TCP的链接。Producer和consumer都是经过TCP链接到RabbitMQ Server的。架构

Channel:虚拟链接。它创建在上述的TCP链接中。数据流动都是在Channel中进行的。也就是说,通常状况是程序起始创建TCP链接,第二步就是创建这个Channel。

2   RABBITMQ安装和启动

Ubuntu 安装

编辑 /etc/apt/sources.list 文件添加以下行

deb http://www.rabbitmq.com/debian/ testing main

执行更新软件源命令

#apt-get update.

安装rabbitmq

#apt-get install rabbitmq-server

启动rabbitmq

# service rabbitmq-server start

 

Centos 安装

从官网下载erlang和rabbitmq 的rpm包

安装erlang

#rpm –i erlang-17.4-1.el6.x86_64.rpm

安装rabbitmq

#rpm –i rabbitmq-server-3.5.3-1.noarch.rpm

rabbitmq的rpm安装包不能指定安装路径

3   RABBITMQ配置文件

RABBITMQ配置文件位置:

  • Generic UNIX- $RABBITMQ_HOME/etc/rabbitmq/
  • Debian- /etc/rabbitmq/
  • RPM- /etc/rabbitmq/

 

4   RABBITMQ页面(web)管理

启用web插件

#rabbitmq-plugins enable rabbitmq_management #启用

关闭web插件

# rabbitmq-plugins disable rabbitmq_management

能够用默认帐号guest,guest登录http://主机IP:15672,若是要远程登陆,须要先建立账户,可查看下一节。

5   RABBITMQ建立账户

若是是集群的话,只要在一台主机设置便可,其它会自动同步。

#rabbitmqctl add_user iom 123456  –iom为新建的用户,123456为密码

#rabbitmqctl  set_user_tags iom administrator –将用户设置为管理员角色

#rabbitmqctl set_permissions -p / iom “.*” “.*” “.*”

–在 / 虚拟主机里设置iom用户配置权限,写权限,读权限。.*是正则表达式里用法。rabbitmq的权限是根据不一样的虚拟主机(virtual hosts)配置的,同用户在不一样的虚拟主机(virtual hosts)里可能不同。

2   RABBITMQ安全特性

6   publish消息确认机制

若是采用标准的 AMQP 协议,则惟一可以保证消息不会丢失的方式是利用事务机制 — 令 channel 处于 transactional 模式、向其 publish 消息、执行 commit 动做。在这种方式下,事务机制会带来大量的多余开销,并会致使吞吐量降低 250% 。为了补救事务带来的问题,引入了 confirmation 机制(即 Publisher Confirm)。

confirm 机制是在channel上使用 confirm.select方法,处于 transactional 模式的 channel 不能再被设置成 confirm 模式,反之亦然。

在 channel 被设置成 confirm 模式以后,全部被 publish 的后续消息都将被 confirm(即 ack) 或者被 nack 一次。可是没有对消息被 confirm 的快慢作任何保证,而且同一条消息不会既被 confirm 又被 nack 。

RabbitMQ 将在下面的状况中对消息进行 confirm :

RabbitMQ发现当前消息没法被路由到指定的 queues 中;

非持久属性的消息到达了其所应该到达的全部 queue 中(和镜像 queue 中);

持久消息到达了其所应该到达的全部 queue 中(和镜像 queue 中),并被持久化到了磁盘(被 fsync);

持久消息从其所在的全部 queue 中被 consume 了(若是必要则会被 acknowledge)。

 

7   consumer消息确认机制

为了保证数据不被丢失,RabbitMQ支持消息确认机制,即acknowledgments。

若是没启动消息确认机制,RabbitMQ在consumer收到消息后就会把消息删除。

启用消息确认后,consumer在处理数据后应经过回调函数显示发送ack, RabbitMQ收到ack后才会删掉数据。若是consumer一段时间内不回馈,RabbitMQ会将该消息从新分配给另一个绑定在该队列上的consumer。另外一种状况是consumer断开链接,可是获取到的消息没有回馈,则RabbitMQ一样从新分配。

注意:若是consumer 没调用basic.qos 方法设置prefetch_count=1,那即便该consumer有未ack的messages,RabbitMQ仍会继续发messages给它。

8   消息持久化

消息确认机制确保了consumer退出时消息不会丢失,但若是是RabbitMQ自己因故障退出,消息仍是会丢失。为了保证在RabbitMQ出现意外状况时数据仍没有丢失,须要将queue和message都要持久化。

queue持久化:channel.queue_declare(queue=’hello’, durable=True)

message持久化:channel.basic_publish(exchange=”,

routing_key=”task_queue”,

body=message,

properties=pika.BasicProperties(

delivery_mode = 2,)  #消息持久化

)

即便有消息持久化,数据也有可能丢失,由于rabbitmq是先将数据缓存起来,到必定条件才保存到硬盘上,这期间rabbitmq出现意外数据有可能丢失。

网上有测试代表:持久化会对RabbitMQ的性能形成比较大的影响,可能会降低10倍不止。

3   RABBITMQ集群

1   RABBITMQ集群基本概念

一个RABBITMQ集 群中能够共享user,virtualhosts,queues(开启Highly Available Queues),exchanges等。但message只会在建立的节点上传输。当message进入A节点的queue中后,consumer从B节点拉取时,RabbitMQ会临时在A、B间进行消息传输,把A中的消息实体取出并通过B发送给consumer。因此consumer应尽可能链接每个节点,从中取消息。

RABBITMQ的集群节点包括内存节点、磁盘节点。内存节点的元数据仅放在内存中,性能比磁盘节点会有所提高。不过,若是在投递message时,打开了message的持久化,那么内存节点的性能只能体如今资源管理上,好比增长或删除队列(queue),虚拟主机(vrtual hosts),交换机(exchange)等,发送和接受message速度同磁盘节点同样。一个集群至少要有一个磁盘节点。

2   RABBITMQ搭建集群

环境:有三台主机,主机名和IP以下,rabbitmq的执行用户为rabbitmq,所属组为rabbitmq。

主机名         IP

rabbitmq1 192.168.10.2

rabbitmq2 192.168.10.3

rabbitmq3  192.168.10.4

同步erlang.cookie

杀掉rabbitmq2和rabbitmq3的rabbitmq进程:

#ps –ef|grep rab|awk ‘{print $2}’|xargs kill -9。–用service rabbitmq-servier stop停会有遗留进程。

登录rabbitmq1(rabbitmq1上的rabbitmq服务不能关),执行

#cd /var/lib/rabbitmq     –进入erlang.cookie所在目录,只有ls –al能看见此文件

#chmod 777 .erlang*       –该文件默认为400权限,为方便传输,先修改权限,非必须操做

#scp .erlang.cookie  rabbitmq@192.168.10.3:/var/lib/rabbitmq –将此文件传给另外两条主机

#scp .erlang.cookie  rabbitmq@192.168.10.4:/var/lib/rabbitmq

#chmod 400 .er*          –恢复文件权限

分别在rabbitmq2和rabbitmq3 上执行

#chown rabbitmq:rabbitmq .er*  –修改文件所属用户和所属组

#chmod 400 .er*  –修改文件权限

#service rabbitmq-server start

加入集群

查询rabbitmq1节点名称

#rabbitmqctl cluster_status

Cluster status of node rabbit@rabbitmq1 …

[{nodes,[{disc,[rabbit@rabbitmq1]}]},{running_nodes,[rabbit@ rabbitmq1]}]

…done.

rabbitmq2 加入rabbitmq1 节点.

# rabbitmqctl stop_app   –关掉rabbitmq2服务

# rabbitmqctl join_cluster rabbit@rabbitmq1 — rabbitmq2加入rabbitmq1, rabbitmq2必须能经过rabbitmq1的主机名ping通rabbitmq1。

# rabbitmqctl start_app  –启动rabbitmq2服务

查看集群信息

# rabbitmqctl cluster_status –此时里面就应该能看见两个节点。集群名字为rabbit@rabbitmq。

用相同的方法把rabbitmq3也加入rabbitmq1。

更改节点属性

#rabbitmqctl stop_app  –中止rabbitmq服务

#rabbitmqctl change_cluster_node_type disc/ram –更改节点为磁盘或内存节点

#rabbitmqctl start_app –开启rabbitmq服务

 

查看集群状态

#rabbitmqctl cluster_status

[{nodes,[{disc,[rabbit@rabbitmq1,rabbit@rabbitmq2,rabbit@rabbitmq3]}]}, {running_nodes,[rabbit@rabbitmq1,rabbit@rabbitmq2, rabbit@rabbitmq3]}]…done.

 

–第一行是集群中的节点成员,disc表示这些都是磁盘节点。

–第二行是正在运行的节点成员

3   RABBITMQ退出集群

假设要把rabbitmq2退出集群

在rabbitmq2上执行

#rabbitmqctl stop_app

#rabbitmqctl reset

#rabbitmqctl start_app

 

在集群主节点上执行

# rabbitmqctl forget_cluster_node rabbit@rabbitmq2

4   RABBITMQ集群重启

集群重启时,最后一个挂掉的节点应该第一个重启,若是因特殊缘由(好比同时断电),而不知道哪一个节点最后一个挂掉。可用如下方法重启:

先在一个节点上执行

#rabbitmqctl force_boot

#service rabbitmq-server start

在其余节点上执行

#service rabbitmq-server start

查看cluster状态是否正常(要在全部节点上查询)。

#rabbitmqctl cluster_status

 

若是有节点没加入集群,能够先退出集群,而后再从新加入集群。

上述方法不适合内存节点重启,内存节点重启的时候是会去磁盘节点同步数据,若是磁盘节点没起来,内存节点一直失败。

5   注意事项

  • cookie在全部节点上必须彻底同样,同步时必定要注意。
  • erlang是经过主机名来链接服务,必须保证各个主机名之间能够ping通。能够经过编辑/etc/hosts来手工添加主机名和IP对应关系。若是主机名ping不通,rabbitmq服务启动会失败。
  • 若是queue是非持久化queue,则若是建立queue的那个节点失败,发送方和接收方能够建立一样的queue继续运做。但若是是持久化queue,则只能等建立queue的那个节点恢复后才能继续服务。
  • 在集群元数据有变更的时候须要有disk node在线,可是在节点加入或退出的时候全部的disk node必须所有在线。若是没有正确退出disk node,集群会认为这个节点当掉了,在这个节点恢复以前不要加入其它节点。.

 

4   RABBITMQ HA

1   镜像队列概念

镜像队列能够同步queue和message,当主queue挂掉,从queue中会有一个变为主queue来接替工做。

镜像队列是基于普通的集群模式的,因此你仍是得先配置普通集群,而后才能设置镜像队列。

镜像队列设置后,会分一个主节点和多个从节点,若是主节点宕机,从节点会有一个选为主节点,原先的主节点起来后会变为从节点。

queue和message虽然会存在全部镜像队列中,但客户端读取时不论物理面链接的主节点仍是从节点,都是从主节点读取数据,而后主节点再将queue和message的状态同步给从节点,所以多个客户端链接不一样的镜像队列不会产生同一message被屡次接受的状况。

2   配置镜像队列

沿用3.2的环境,如今咱们把名为“hello”的队列设置为同步给全部节点

#rabbitmqctl set_policy  ha-all “hello” ‘{“ha-mode”:”all”}’

ha-all 是同步模式,指同步给全部节点,还有另外两种模式ha-exactly表示在指定个数的节点上进行镜像,节点的个数由ha-params指定,ha-nodes表示在指定的节点上进行镜像,节点名称经过ha-params指定;

hello 是同步的队列名,能够用正则表达式匹配;

{“ha-mode”:”all”} 表示同步给全部,同步模式的不一样,此参数也不一样。

执行上面命令后,能够在web管理界面查看queue 页面,里面hello队列的node节点后会出现+2标签,表示有2个从节点,而主节点则是当前显示的node(xf7021是测试用的名字,按4-2应该为rabbitmq(1-3))。

2

5   keepalived和执行脚本

1   A-keepalived配置

红字为手工加的备注,原文件里没有。

vi /etc/keepalived/keepalived.cnf文件

 

global_defs {

router_id LVS_MASTER   }

 

vrrp_instance VI_1 {

state MASTER

interface eth0

virtual_router_id 51

priority 100

advert_int 1

authentication {

auth_type PASS

auth_pass 1111

}

virtual_ipaddress {

192.168.10.251/24      #rabbitmq

}

}

 

virtual_server 192.168.10.251 5672 {

delay_loop 6

lb_algo rr

lb_kind DR

protocol TCP

 

real_server 192.168.10.3 5672 {

weight 3

TCP_CHECK {

connect_timeout 3

nb_get_retry 3

delay_before_retry 3

connect_port 5672

}

}

 

real_server 192.168.10.4 5672 {

weight 3

TCP_CHECK {

connect_timeout 3

nb_get_retry 3

delay_before_retry 3

connect_port 5672

}

}

}

 

2   rabbitmq 服务器上执行的脚本

lvs_rabbitmq.sh脚本内容:

#!/bin/bash

VIP=192.168.10.251

case “$1” in

start)

ifconfig lo:0 $VIP netmask 255.255.255.255 broadcast $VIP

/sbin/route add -host $VIP dev lo:0

echo “1” >/proc/sys/net/ipv4/conf/lo/arp_ignore

echo “2” >/proc/sys/net/ipv4/conf/lo/arp_announce

echo “1” >/proc/sys/net/ipv4/conf/all/arp_ignore

echo “2” >/proc/sys/net/ipv4/conf/all/arp_announce

sysctl -p >/dev/null 2>&1

echo “lvs_vip server start ok!”;;

stop)

ifconfig lo:0 down

/sbin/route del $VIP >/dev/null 2>&1

 

echo “0” >/proc/sys/net/ipv4/conf/lo/arp_ignore

echo “0” >/proc/sys/net/ipv4/conf/lo/arp_announce

echo “0” >/proc/sys/net/ipv4/conf/all/arp_ignore

echo “0” >/proc/sys/net/ipv4/conf/all/arp_announce

 

echo “lvs_vip server stoped.”;;

 

*)

echo “arg start|stop.”

exit 1

esac

 

exit 0

 

 

6   RABBITMQ监控

RabbitMQ监控项目不少,可经过web管理界面监控。

3

OVERVIEW页面下有4个标签。主要关注totals和nodes两个。

1   totals

4

ready为待处理消息量,total为总消息量。

publish为每秒发送消息量,deliver为每秒接受消息量。

下面5个灰色长方块分别表明对应的模块链接数。

 

2   nodes

5

name为节点名称,后面5个蓝色方块分别表明文件打开数,socket链接数,erlang processes(暂时未知),内存占用两,磁盘空余量;info里显示节点属性,将鼠标放在内容上会显示对应的统计内容。

7   系统测试

3   测试环境

主机名         IP

VIP:      192.168.10.251

client     192.168.10.2    –本测试发送(producer)和接收(consumer)在同一台机器

rabbitmq1 192.168.10.3

rabbitmq2  192.168.10.4

 

4   准备工做

负载机启动keepalived

# service keepalived start

 

 

rabbitmq1和rabbitmq2执行5-2的脚本

#./lvs_rabbitmq.sh start

 

按第3和第4章的方法组建集群,配置镜像队列,节点类型最好都设置为磁盘节点。

按第1-5建立用户。

 

5   客户机测试脚本

测试用RabbitMQ的python语言客户端,注意python是靠缩进量来区分语句块。红色部分为注释,源码上没有。

 

发送源码:

#vi send.py

 

#!/usr/bin/env python

import pika

import time

credentials=pika.PlainCredentials(‘iom’,’123456′)    –配置链接的用户名和密码

parameters=pika.ConnectionParameters(‘192.168.10.251′,5672,’/’,credentials)

connection=pika.BlockingConnection(parameters)

channel=connection.channel()

channel.queue_declare(queue=’hello’)

 

 

count=0

while count<9999:

message=’Hello World’+str(count)

count=count+1

channel.basic_publish(exchange=”,routing_key=’hello’,body=message)

print “Sent %s” %(message)

time.sleep(1)

connection.close()

 

接收源码

#vi receive.py

#!/usr/bin/env python

import pika

connection=pika.BlockingConnection(pika.ConnectionParameters(host=’xf7027′))

channel=connection.channel()

channel.queue_declare(queue=’hello’)

print ‘[*] Waiting for message.To exit press CTRL+C’

def callback(ch,method,properties,body):

print “[x] Received %r” %(body,)

 

channel.basic_consume(callback,queue=’hello’,no_ack=True)

channel.start_consuming()

6   测试过程

测试keepalived分配

在客户机上执行

#python send.py

 

在负载机上执行

#watch ipvsadm –Ln

能够看到rabbitmq1或rabbitmq2的activeconn列数值为1。

 

客户机从新执行发送程序

#python send.py

在负载机上能够看到另外一个rabbitmq服务的activeconn 列数值也变为1。

 

 

 

测试容灾性:

 

在客户机上分别执行发送和接受程序。

#python send.py

#python receive.py

而后关掉一个rabbitmq节点,若是关掉的正好是客户机连的那个节点的话,客户机发送和接收程序会报错退出(程序自己若是有错误重发机制则不受任何影响)。若是关掉的是另外的节点,程序不受任何影响。

相关文章
相关标签/搜索