RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现。java
AMQP,即Advanced Message Queuing Protocol,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件同产品、不一样的开发语言等条件的限制。node
AMQP的实现有:RabbitMQ、OpenAMQ、Apache Qpid、Redhat Enterprise MRG、AMQP Infrastructure、ØMQ、Zyre等。 web
一、跨系统的异步通讯 ,异步,解耦,削峰都有体现。正则表达式
二、应用内的同步变成异步 秒杀:本身发送给本身数据库
三、基于Pub/Sub模型实现的事件驱动 放款失败通知、提货通知、购买碎屏保 系统间同步数据 摒弃ELT(好比全量同步商户数据); 摒弃API(好比定时增量获取用户、获取产品,变成增量广播)。vim
四、利用RabbitMQ实现事务的最终一致性centos
RabbitMQ使用Erlang语言编写,使用Mnesia数据库存储消息。bash
当消息生产者/(消费者)要生产/(消费)消息的时候须要与服务器创建一个长链接,在RabbitMQ中叫作Connection,为了解决客户端与服务器所产生的频繁的链接问题,因为会大量的消耗服务器内存,这里引入了消息通道的概念,在保持长链接的状况下。能够经过创建Channel的方式与服务器通信,当有请求的时候就会创建通道,结束则关闭通道。在RabbitMQ中,通常的作法不会让消息直接发送到消息队列中,这里引入了Exchange(交换机)的概念,经过交换机来实现消息更加灵活的消息分发,交换机没有实际的进程,而队列是有的,它只是一个地址列表,在队列建立的时候会与Exchange绑定一个专属的 key ,在生产者生产消息的时候也会指定这个key,那么 Exchange 就会经过这个key去匹配 Queue,从而实现灵活分发。而后消费者会经过订阅指定的队列去消费消息。在RabbitMQ中有Virtual Host 虚拟机的概念,他能够当成是一个小型的MQ,一个RabbitMQ服务器上能够有多个虚拟机,相互之间是隔离的,固然不一样的虚拟机之间能够有相同命名的交换机与队列,能够实现资源的隔离。服务器
最新版本的RabbitMQ有四种交换机类型,分别是Direct exchange、Fanout exchange、Topic exchange、Headers exchange。cookie
Direct Exchange 直连交换机:
定义:直连类型的交换机与一个队列绑定时,须要指定一个明确的binding key。路由规则:发送消息到直连类型的交换机时,只有routing key跟binding key彻底匹配时,绑定的队列才能收到消息。
示例:
// 只有队列1能收到消息 channel.basicPublish("MY_DIRECT_EXCHANGE", "key1", null, msg.getBytes());
Topic Exchange 主题交换机:
定义:主题类型的交换机与一个队列绑定时,能够指定按模式匹配的routing key。通配符有两个,*表明匹配一个单词。#表明匹配零个或者多个单词。单词与单词之间用 . 隔开。路由规则:发送消息到主题类型的交换机时,routing key符合binding key的模式时,绑定的队列才能收到消息。
示例:
// 只有队列1能收到消息 channel.basicPublish("MY_TOPIC_EXCHANGE", "sh.abc", null, msg.getBytes()); // 队列2和队列3能收到消息 channel.basicPublish("MY_TOPIC_EXCHANGE", "bj.book", null, msg.getBytes()); // 只有队列4能收到消息 channel.basicPublish("MY_TOPIC_EXCHANGE", "abc.def.food", null, msg.getBytes());
Fanout Exchange 广播交换机:
定义:广播类型的交换机与一个队列绑定时,不须要指定binding key。路由规则:当消息发送到广播类型的交换机时,不须要指定routing key,全部与之绑定的队列都能收到消息。
Headers Exchanges:
不处理路由键。而是根据发送的消息内容中的headers属性进行匹配。在绑定Queue与Exchange时指定一组键值对;当消息发送到RabbitMQ时会取到该消息的headers与Exchange绑定时指定的键值对进行匹配;若是彻底匹配则消息会路由到该队列,不然不会路由到该队列。headers属性是一个键值对,能够是Hashtable,键值对的值能够是任何类型。而fanout,direct,topic 的路由键都须要要字符串形式的。匹配规则x-match有下列两种类型:
x-match = all :表示全部的键值对都匹配才能接受到消息。
x-match = any :表示只要有键值对匹配就能接受到消息。
系统版本:CentOS Linux release 7.5.1804 (Core)
RabbitMQ版本:3.6.12
Erlang版本:erlang-19.0.4-1.el7.centos.x86_64
JDK版本:1.8
Erlang安装:
Erlang包安装:https://www.erlang.org/downloads/19.0 下载对应安装包 otp_src_19.0.tar.gz
添加依赖:yum install ncurses-devel
解压:tar -xf otp_src_20.1.tar.gz ,重命名下 mv otp_src_19.0 otp
进入目录安装:./configure --prefix=/usr/local/erlang --without-javac & make(时间有点小长) & make install
cd /usr/local/erlang/bin 执行 ./erl 出现如下即安装完成:
添加环境变量:export PATH=$PATH:/usr/local/erlang/bin
RabbitMQ安装:
下载安装包:http://www.rabbitmq.com/releases/rabbitmq-server/v3.6.12/ 选择 rabbitmq-server-generic-unix-3.6.12.tar.xz 下载
上传至服务器,我这边放在 /mysoft 下。Mq安装包默认是xz结尾的,解压xz文件后获得tar文件
xz -d rabbitmq-server-generic-unix-3.6.12.tar.xz
执行解压命令 tar -xvf rabbitmq-server-generic-unix-3.6.12.tar
添加环境变量:export PATH=$PATH:/mysoft/rabbitmq_server-3.6.12/sbin
环境变量生效:source /etc/profile
配置网页插件:
首先建立目录,不然可能报错:mkdir /etc/rabbitmq
启动mq:./rabbitmq-server -detached
进入 sbin 目录,启用插件:./rabbitmq-plugins enable rabbitmq_management
启用rabbitmq_management插件去管理rabbitmq服务,可是在访问管理界面使用guest用户登陆时出现login failed错误。到服务器上查询日志显示出现错误的缘由是:
HTTP access denied: user 'guest' - User can only log in via localhost。
rabbitmq从3.3.0开始禁止使用guest/guest权限经过除localhost外的访问。若是想使用guest/guest经过远程机器访问,须要在rabbitmq配置文件中(/mysoft/rabbitmq_server-3.6.12/etc/rabbitmq/rabbitmq.config)中设置loopback_users为[]。rabbitmq.config文件须要本身建立,完整内容以下(注意后面的半角句号):
[{rabbit, [{loopback_users, []}]}].
重启,再经过访问 http://192.168.254.137:15672 ,经过guest/guest 登录:
查看服务状态:rabbitmqctl status
关闭服务:rabbitmqctl stop
查看mq用户:rabbitmqctl list_users
查看用户权限:rabbitmqctl list_user_permissions guest
新增用户: rabbitmqctl add_user admin 123456
设置角色:rabbitmqctl set_user_tags admin administrator
赋予管理员权限:rabbitmqctl set_permissions -p "/" admin ".*" ".*" ".*"
rabbitmqctl set_permissions -p [Vhost_Name][username][conf][write][read]
conf:一个正则表达式match哪些配置资源可以被该用户访问
write:一个正则表达式match哪些配置资源可以被该用户读。
read:一个正则表达式match哪些配置资源可以被该用户访问
根据我的需求更改rabbitmq的环境变量:
建立环境变量配置文件 vim
/mysoft/rabbitmq_server-3.6.12/etc/rabbitmq/rabbitmq-env.conf
RABBITMQ_MNESIA_BASE=/data/rabbitmq/mnesia //数据库位置 RABBITMQ_LOG_BASE=/data/rabbitmq/log //日志文件目录
mkdir /data/rabbitmq/mnesia mkdir /data/rabbitmq/log chowm -R rabbitmq:rabbitmq /data/rabbitmq
默认状态下 日志存放位置 /mysoft/rabbitmq_server-3.6.12/var/log/rabbitmq 下面,经过查看日志,在启动的时候会输出如下信息:
=INFO REPORT==== 24-Jan-2019::22:25:38 === Starting RabbitMQ 3.6.12 on Erlang 19.0.4 Copyright (C) 2007-2017 Pivotal Software, Inc. Licensed under the MPL. See http://www.rabbitmq.com/ =INFO REPORT==== 24-Jan-2019::22:25:38 === node : rabbit@localhost home dir : /root //配置文件位置 config file(s) : /mysoft/rabbitmq_server-3.6.12/etc/rabbitmq/rabbitmq.config cookie hash : aRQYNdsONCZK2FIkVnK0bA== //日志文件 log : /mysoft/rabbitmq_server-3.6.12/var/log/rabbitmq/rabbit@localhost.log sasl log : /mysoft/rabbitmq_server-3.6.12/var/log/rabbitmq/rabbit@localhost-sasl.log //数据库位置 database dir : /mysoft/rabbitmq_server-3.6.12/var/lib/rabbitmq/mnesia/rabbit@localhost //内存信息 =INFO REPORT==== 24-Jan-2019::22:25:40 === Memory high watermark set to 389 MiB (408762777 bytes) of 974 MiB (1021906944 bytes) total =INFO REPORT==== 24-Jan-2019::22:25:40 === Enabling free disk space monitoring =INFO REPORT==== 24-Jan-2019::22:25:40 === Disk free limit set to 50MB =INFO REPORT==== 24-Jan-2019::22:25:40 === Limiting to approx 924 file handles (829 sockets) =INFO REPORT==== 24-Jan-2019::22:25:40 === FHC read buffering: OFF FHC write buffering: ON =INFO REPORT==== 24-Jan-2019::22:25:40 === Waiting for Mnesia tables for 30000 ms, 9 retries left =INFO REPORT==== 24-Jan-2019::22:25:40 === Waiting for Mnesia tables for 30000 ms, 9 retries left =INFO REPORT==== 24-Jan-2019::22:25:40 === Priority queues enabled, real BQ is rabbit_variable_queue =INFO REPORT==== 24-Jan-2019::22:25:40 === Starting rabbit_node_monitor =INFO REPORT==== 24-Jan-2019::22:25:40 === Management plugin: using rates mode 'basic' =INFO REPORT==== 24-Jan-2019::22:25:40 === msg_store_transient: using rabbit_msg_store_ets_index to provide index =INFO REPORT==== 24-Jan-2019::22:25:40 === msg_store_persistent: using rabbit_msg_store_ets_index to provide index //AMQP端口 =INFO REPORT==== 24-Jan-2019::22:25:40 === started TCP Listener on 0.0.0.0:5672 //管理端口 =INFO REPORT==== 24-Jan-2019::22:25:40 === Management plugin started. Port: 15672 //数据库加载完成 =INFO REPORT==== 24-Jan-2019::22:25:40 === Statistics database started. //插件加载 =INFO REPORT==== 24-Jan-2019::22:25:40 === Server startup complete; 6 plugins started. * rabbitmq_management * rabbitmq_management_agent * rabbitmq_web_dispatch * amqp_client * cowboy * cowlib
这段日志包含了RabbitMQ的版本号、Erlang的版本号、RabbitMQ服务节点名称、cookie的hash值、RabbitMQ配置文件地址、内存限制、磁盘限制、默认帐户guest的建立以及权限配置等等。