消息中间件也能够称消息队列,是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通讯来进行分布式系统的集成。经过提供消息传递和消息队列模型,能够在分布式环境下扩展进程的通讯。java
当下主流的消息中间件有RabbitMQ、Kafka、ActiveMQ、RocketMQ等。其能在不一样平台之间进行通讯,经常使用来屏蔽各类平台协议之间的特性,实现应用程序之间的协同。优势在于可以在客户端和服务器之间进行同步和异步的链接,而且在任什么时候刻均可以将消息进行传送和转发,是分布式系统中很是重要的组件,主要用来解决应用耦合、异步通讯、流量削峰等问题。 node
一、P2P模式linux
P2P模式包含三个角色:消息队列(Queue)、发送者(Sender)、接收者(Receiver)。每一个消息都被发送到一个特定的队列,接收者从队列中获取消息。队列保留着消息,直到它们被消费或超时。c++
P2P的特色:git
二、Pub/Sub模式github
Pub/Sub模式包含三个角色:主题(Topic)、发布者(Publisher)、订阅者(Subscriber) 。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。web
Pub/Sub的特色:正则表达式
一、Kafkaredis
Kafka是LinkedIn开源的分布式发布-订阅消息系统,目前归属于Apache顶级项目。Kafka主要特色是基于Pull的模式来处理消息消费,追求高吞吐量,一开始的目的就是用于日志收集和传输。0.8版本开始支持复制,不支持事务,对消息的重复、丢失、错误没有严格要求,适合产生大量数据的互联网服务的数据收集业务。vim
二、RabbitMQ
RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求还在其次。
三、RocketMQ
RocketMQ是阿里开源的消息中间件,它是纯Java开发,具备高吞吐量、高可用性、适合大规模分布式系统应用的特色。RocketMQ思路起源于Kafka,但并非Kafka的一个Copy,它对消息的可靠传输及事务性作了优化,目前在阿里集团被普遍应用于交易、充值、流计算、消息推送、日志流式处理、binglog分发等场景。
RabbitMQ比Kafka可靠,Kafka更适合IO高吞吐的处理,通常应用在大数据日志处理或对实时性(少许延迟),可靠性(少许丢数据)要求稍低的场景使用,好比ELK日志收集。
RabbitMQ是一个在AMQP(Advanced Message Queuing Protocol )基础上实现的,可复用的企业消息系统。它能够用于大型软件系统各个模块之间的高效通讯,支持高并发,支持可扩展。它支持多种客户端如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP等,支持AJAX,持久化,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
RabbitMQ是使用Erlang编写的一个开源的消息队列,自己支持不少的协议:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它变的很是重量级,更适合于企业级的开发。它同时实现了一个Broker构架,这意味着消息在发送给客户端时先在中心队列排队,对路由(Routing)、负载均衡(Load balance)或者数据持久化都有很好的支持。
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不一样产品,不一样的开发语言等条件的限制。
MQ 全称为Message Queue, 消息队列。是一种应用程序对应用程序的通讯方法。应用程序经过读写出入队列的消息(针对应用程序的数据)来通讯,而无需专用链接来连接它们。
消息传递指的是程序之间经过在消息中发送数据进行通讯,而不是经过直接调用彼此来通讯。队列的使用除去了接收和发送应用程序同时执行的要求。
在项目中,将一些无需即时返回且耗时的操做提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提升了系统的吞吐量。
消息队列的使用场景是怎样的?
对于一个大型的软件系统来讲,它会有不少的组件或者说模块或者说子系统或者(subsystem or Component or submodule)。那么这些模块的如何通讯?这和传统的IPC有很大的区别。传统的IPC不少都是在单一系统上的,模块耦合性很大,不适合扩展(Scalability);若是使用socket那么不一样的模块的确能够部署到不一样的机器上,可是仍是有不少问题须要解决。好比:
1)信息的发送者和接收者如何维持这个链接,若是一方的链接中断,这期间的数据如何防止丢失?
2)如何下降发送者和接收者的耦合度?
3)如何让Priority高的接收者先接到数据?
4)如何作到load balance?有效均衡接收者的负载?
5)如何有效的将数据发送到相关的接收者?也就是说将接收者subscribe 不一样的数据,如何作有效的filter。
6)如何作到可扩展,甚至将这个通讯模块发到cluster上?
7)如何保证接收者接收到了完整,正确的数据?
AMDQ协议解决了以上的问题,而RabbitMQ实现了AMQP。
RabbitMQ从总体上来看是一个典型的生产者消费者模型,主要负责接收、存储和转发消息
AMQP模型中,消息在producer中产生,发送到MQ的exchange上,exchange根据配置的路由方式发到相应的Queue上,Queue又将消息发送给consumer,消息从queue到consumer有push和pull两种方式。 消息队列的使用过程大概以下:
exchange接收到消息后,就根据消息的key和已经设置的binding,进行消息路由,将消息投递到一个或多个队列里。 exchange也有几个类型,彻底根据key进行投递的叫作Direct交换机,例如,绑定时设置了routing key为”abc”,那么客户端提交的消息,只有设置了key为”abc”的才会投递到队列。
一、安装 erlang
添加yum支持
cd /usr/local/src/ mkdir rabbitmq cd rabbitmq wget http://packages.erlang-solutions.com/erlang-solutions-1.0-1.noarch.rpm rpm -ivh erlang-solutions-1.0-1.noarch.rpm rpm --import http://packages.erlang-solutions.com/rpm/erlang_solutions.asc yum install erlang
一、用 yum 安装 RabbitMQ
rpm --import https://github.com/rabbitmq/signing-keys/releases/download/2.0/rabbitmq-release-signing-key.asc #this example assumes the CentOS 7 version of the package yum install rabbitmq-server-3.7.13-1.el7.noarch.rpm
rpm --import https://www.rabbitmq.com/rabbitmq-release-signing-key.asc # this example assumes the CentOS 7 version of the package yum install rabbitmq-server-3.7.13-1.el7.noarch.rpm
二、用 rpm 手动安装
下载:
wget https://github.com/rabbitmq/rabbitmq-server/releases/download/v3.7.13/rabbitmq-server-3.7.13-1.el7.noarch.rpm
上传rabbitmq-server-3.7.13-1.el7.noarch.rpm文件到/usr/local/src/rabbitmq/
安装:
rpm -ivh rabbitmq-server-3.7.13-1.el7.noarch.rpm
经常使用命令
service rabbitmq-server start service rabbitmq-server stop service rabbitmq-server restart chkconfig rabbitmq-server on //设置开机自启
设置配置文件:
cd /etc/rabbitmq cp /usr/share/doc/rabbitmq-server-3.7.13/rabbitmq.config.example /etc/rabbitmq/ mv rabbitmq.config.example rabbitmq.config
设置用户远程访问:
vim /etc/rabbitmq/rabbitmq.config
找到{}loopack_users , [] }把","去掉
开启web界面管理工具
rabbitmq-plugins enable rabbitmq_management service rabbitmq-server restart
防火墙设置
/sbin/iptables -I INPUT -p tcp --dport 15672 -j ACCEPT /etc/rc.d/init.d/iptables save
一、基本命令
启动监控管理器:rabbitmq-plugins enable rabbitmq_management
关闭监控管理器:rabbitmq-plugins disable rabbitmq_management
启动rabbitmq:rabbitmq-service start
关闭rabbitmq:rabbitmq-service stop
查看全部的队列:rabbitmqctl list_queues
清除全部的队列:rabbitmqctl reset
关闭应用:rabbitmqctl stop_app
启动应用:rabbitmqctl start_app
二、用户和权限设置
添加用户:rabbitmqctl add_user username password
分配角色:rabbitmqctl set_user_tags username administrator
新增虚拟主机:rabbitmqctl add_vhost vhost_name
将新虚拟主机受权给新用户:rabbitmqctl set_permissions -p vhost_name username “.” “.” “.”(后面三个””表明用户拥有配置、写、读所有权限)
三、角色说明
一、环境要求
一、全部节点须要再同一个局域网内;
二、全部节点须要有相同的 erlang cookie,不然不能正常通讯,为了实现cookie内容一致,采用scp的方式进行。
三、准备三台虚拟机,配置相同
rabbitmq01 192.168.101.11
rabbitmq02 192.168.101.12
rabbitmq03 192.168.101.13
操做系统:centos7.5
二、部署过程
一、全部节点配置/etc/hosts
node1 192.168.101.11
node2 192.168.101.12
node3 192.168.101.13
二、全部节点安装 erLang 和 rabbitmq
一、安装erlang
安装依赖包
yum install -y *epel* gcc-c++ unixODBC unixODBC-devel openssl-devel ncurses-devel
编译安装
wget http://erlang.org/download/otp_src_21.3.tar.gz tar -zxvf otp_src_21.3.tar.gz cd otp_src_21.3 ./configure --prefix=/usr/local/bin/erlang --without-javac make && make install echo "export PATH=$PATH:/usr/local/bin/erlang/bin:/usr/local/bin/rabbitmq_server-3.6.15/sbin" >> /etc/profile source /etc/profile
出现 erl 命令则说明安装成功;
编译安装
wget http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.15/rabbitmq-server-generic-unix-3.6.15.tar.xz yum install -y xz xz -d rabbitmq-server-generic-unix-3.6.15.tar.xz tar -xvf rabbitmq-server-generic-unix-3.6.15.tar -C /usr/local/bin/ echo "export PATH=$PATH:/usr/local/bin/erlang/bin:/usr/local/bin/rabbitmq_server-3.6.15/sbin" >> /etc/profile source /etc/profile
导入 rabbitmq 的管理界面
rabbitmq-plugins enable rabbitmq_management
**设置 erlang
找到erlang cookie文件的位置,官方在介绍集群的文档中提到过.erlang.cookie 通常会存在这两个地址:第一个是home/.erlang.cookie;第二个地方就是/var/lib/rabbitmq/.erlang.cookie。若是咱们使用解压缩方式安装部署的rabbitmq,那么这个文件会在{home}目录下,也就是$home/.erlang.cookie。若是咱们使用rpm等安装包方式进行安装的,那么这个文件会在/var/lib/rabbitmq目录下。
这里将 node1 的该文件复制到 node二、node3,注意这个文件的权限是 400(默认便是400),所以采用scp的方式只拷贝内容便可;
能够经过cat $home/.erlang.cookie来查看三台机器的cookie是否一致,设置erlang的目的是要保证集群内的cookie内容一致。
**使用-detached参数运行各节点
rabbitmqctl stop rabbitmq-server -detached
而后能够经过 rabbitmqctl cluster_status查看节点状态。
注意:要先拷贝cookie到另外两台机器上,保证三台机器上的cookie是一致的,而后再启动服务。
因为guest这个用户,只能在本地访问,因此咱们要新增一个用户并赋予权限:
添加用户并设置密码:
rabbitmqctl add_user admin 123456
添加权限(使admin用户对虚拟主机“/” 具备全部权限):
rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
修改用户角色(加入administrator用户组)
rabbitmqctl set_user_tags admin administrator
而后就能够远程访问了,而后可直接配置用户权限等信息。到此,就能够经过http://ip:15672 使用admin 123456 进行登录了。
到这里的话,每一个节点是做为单独的一台RabbitMQ存在的,也能够正常提供服务了
**组成集群
rabbitmq-server 启动时,会一块儿启动节点和应用,它预先设置RabbitMQ应用为standalone模式。要将一个节点加入到现有的集群中,你须要中止这个应用,并将节点设置为原始状态。若是使用./rabbitmqctl stop,应用和节点都将被关闭。因此使用rabbitmqctl stop_app仅仅关闭应用。
一、将 node二、node3与 node1 组成集群,这里以node2为例
node2# rabbitmqctl stop_app node2# rabbitmqctl join_cluster rabbit@node1 ####这里集群的名字必定不要写错了 node2# rabbitmqctl start_app
二、将node3重复上述操做,也加入node1的集群。
node3# rabbitmqctl stop_app node3# rabbitmqctl join_cluster rabbit@node1 ####这里集群的名字必定不要写错了 node3# rabbitmqctl start_app
则此时 node2 与 node3 也会自动创建链接,集群配置完毕;
#使用内存节点加入集群 node2 # rabbitmqctl join_cluster --ram rabbit@node1
三、在 RabbitMQ 集群任意节点上执行 rabbitmqctl cluster_status来查看是否集群配置成功。
node3# rabbitmqctl cluster_status Cluster status of node rabbit@node3 ... [{nodes,[{disc,[rabbit@node1,rabbit@node2,rabbit@node3]}]}, {running_nodes,[rabbit@node1,rabbit@node2,rabbit@node3]}, {cluster_name,<"rabbit@node1">}, #集群的名称默认为 rabbit@node1 {partitions,[]}, {alarms,[{rabbit@node1,[]},{rabbit@node2,[]},{rabbit@node3,[]}]}]
** 设置镜像队列策略
在任意一个节点上执行以下操做(这里在node1上执行)
首先,在web界面,登录后,点击“Admin--Virtual Hosts(页面右侧)”,在打开的页面上的下方的“Add a new virtual host”处增长一个虚拟主机,同时给用户“admin”和“guest”均加上权限(在页面直接设置、点点点便可);
而后,在linux中执行以下命令
rabbitmqctl set_policy -p coresystem ha-all "^" '{"ha-mode":"all"}'
"coresystem" vhost名称, "^"匹配全部的队列, ha-all 策略名称为ha-all, '{"ha-mode":"all"}' 策略模式为 all 即复制到全部节点,包含新增节点。
则此时镜像队列设置成功。(这里的虚拟主机coresystem是代码中须要用到的虚拟主机,虚拟主机的做用是作一个消息的隔离,本质上可认为是一个rabbitmq-server,是否增长虚拟主机,增长几个,这是由开发中的业务决定,即有哪几类服务,哪些服务用哪个虚拟主机,这是一个规划)。
**镜像队列策略设置说明
rabbitmqctl set_policy [-p Vhost] Name Pattern Definition [Priority] -p Vhost: 可选参数,针对指定vhost下的queue进行设置 Name: policy的名称 Pattern: queue的匹配模式(正则表达式) Definition:镜像定义,包括三个部分ha-mode, ha-params, ha-sync-mode ha-mode:指明镜像队列的模式,有效值为 all/exactly/nodes all:表示在集群中全部的节点上进行镜像 exactly:表示在指定个数的节点上进行镜像,节点的个数由ha-params指定 nodes:表示在指定的节点上进行镜像,节点名称经过ha-params指定 ha-params:ha-mode模式须要用到的参数 ha-sync-mode:进行队列中消息的同步方式,有效值为automatic和manual priority:可选参数,policy的优先级
将全部队列设置为镜像队列,即队列会被复制到各个节点,各个节点状态保持一直。完成这 6 个步骤后,RabbitMQ 高可用集群搭建完成,最后一个步骤就是搭建均衡器。
**安装并配置负载均衡器HA
注意:若是使用阿里云,可使用阿里云的内网slb来实现负载均衡,不用本身搭建HA。
一、在192.168.101.11安装HAProxy
yum -y install HAProxy
二、修改 /etc/haproxy/haproxy.cfg
vim /etc/haproxy/haproxy.cfg global log 127.0.0.1 local2 chroot /var/lib/haproxy pidfile /var/run/haproxy.pid maxconn 4000 user haproxy group haproxy daemon stats socket /var/lib/haproxy/stats defaults log global mode tcp option tcplog option dontlognull retries 3 option redispatch maxconn 2000 contimeout 5s clitimeout 120s srvtimeout 120s listen rabbitmq_cluster 192.168.101.11:5670 mode tcp balance roundrobin server rabbit1 192.168.101.11:5672 check inter 5000 rise 2 fall 2 server rabbit2 192.168.101.12:5672 check inter 5000 rise 2 fall 2
三、重启HAProxy
service haproxy restart
登陆浏览器输入地址http://192.168.101.11:8100/rabbitmqstats查看HAProxy的状态
3、常见问题
常见错误:
一、使用 rabbitmq-server -detached命令启动rabbitmq时,出现如下提示Warning: PID file not written; -detached was passed,此时使用rabbitmqctl status提示服务已启动,可知此问题不用解决。
二、因为更改hostname文件,在每次rabbitmqctl stop或者rabbitmqctl cluster_status等,只要是rabbitmq的命令就报错,提示大概以下
Cluster status of node rabbit@web2 ... Error: unable to connect to node rabbit@web2: nodedown DIAGNOSTICS =========== attempted to contact: [rabbit@web2] rabbit@web2: * connected to epmd (port 4369) on web2 * epmd reports node 'rabbit' running on port 25672 * TCP connection succeeded but Erlang distribution failed * Hostname mismatch: node "rabbit@mq2" believes its host is different. Please ensure that hostnames resolve the same way locally and on "rabbit@mq2" current node details: - node name: 'rabbitmq-cli-11@web2' - home dir: /root - cookie hash: SGwxMdJ3PjEXG1asIEFpBg==
此时先ps aux | grep mq,而后kill -9 该进程,而后再rabbitmq-server -detached便可解决。(即先强杀,再从新启动)
三、使用rabbitmqctl stop,rabbitmq-server -detached从新启动后,原先添加的用户admin、虚拟主机coresystem等均丢失,还须要从新添加。
采用脚本启动,在脚本中写好启动好须要加载的各配置项(建立admin用户并受权,建立虚拟主机并受权,配置镜像队列)。
四、命令
rabbitmqctl stop_app #仅关闭应用,不关闭节点 rabbitmqctl start_app #开启应用 rabbitmq--server -detached #启动节点和应用 rabbitmqctl stop #关闭节点和应用
四、经常使用命令:
Rabbitmq服务器的主要经过rabbitmqctl和rabbimq-plugins两个工具来管理,如下是一些经常使用功能。
一、 服务器启动与关闭
启动: rabbitmq-server –detached 关闭: rabbitmqctl stop 若单机有多个实例,则在rabbitmqctl后加 –n 指定名称
二、插件管理
开启某个插件:rabbitmq-plugins enable xxx 关闭某个插件:rabbitmq-plugins disable xxx 注意:重启服务器后生效。
三、virtual_host管理
新建virtual_host:rabbitmqctl add_vhost xxx 撤销virtual_host:rabbitmqctl delete_vhost xxx
四、用户管理
新建用户:rabbitmqctl add_user xxxpwd 删除用户: rabbitmqctl delete_user xxx 查看用户:rabbitmqctl list_users 改密码: rabbimqctl change_password {username} {newpassword} 设置用户角色:rabbitmqctlset_user_tags {username} {tag ...} Tag能够为 administrator,monitoring, management
五、 权限管理
权限设置:set_permissions [-pvhostpath] {user} {conf} {write} {read} Vhostpath: Vhost路径 user: 用户名 Conf: 一个正则表达式match哪些配置资源可以被该用户访问。 Write: 一个正则表达式match哪些配置资源可以被该用户读。 Read: 一个正则表达式match哪些配置资源可以被该用户访问。
六、获取服务器状态信息
服务器状态:rabbitmqctl status ##其中可查看rabbitmq的版本信息
七、获取集群状态信息
rabbitmqctl cluster_status