RabbitMQ消息队列

一、介绍

消息队列(Message Queue)是一种应用程序对应用程序的通信方式,应用程序通过读写出入队列的消息来通信。

消息发送后可以立即返回,由消息系统来确保消息的可靠传递。消息发布者只管把消息发布到 MQ 中而不用管谁来取,消息使用者只管从 MQ 中取消息而不管是谁发布的。这样发布者和使用者都不用知道对方的存在。

通俗理解消息队列:

生活中的消息队列,如同邮局的邮箱,如果没有邮箱的话,邮递员必须要"找到"接收邮件的人,递给他这封邮件,才算完成任务,可以想象这个过程是很麻烦、很慢、效率低下的;但是如果有了邮箱,邮件直接放进邮箱中,用户只需要去邮箱中找,看看有没有自己的邮件,有就拿走,没有就下次再来,这样就极大的提升了邮件的收发效率。

官方介绍:

RabbitMQ是一个消息代理:它接受和转发消息。您可以将其视为邮局:当您将要发布的邮件放在邮箱中时,您可以确定,邮递员最终会将邮件发送给您的收件人。在这个类比中,RabbitMQ是一个邮箱,邮局和邮递员。

二、在什么情况下会用消息队列?(异步)

2.1 电商订单

比如点外卖,点击下单后的业务逻辑可能包括:检查库存、生成单据、发红包、短信通知等,如果这些业务同步执行,完成下单率会非常低;如发红包,短信通知等不必要的流程,异步执行即可。此时使用MQ,可以在核心流程(扣减库存、生成订单记录)等完成后发送消息到MQ,快速结束本次流程。消费者拉取MQ消息时,发现发红包、短信通知等消息时,再进行处理。

场景:双11是购物狂节,用户下单后,订单系统需要通知库存系统。

传统的做法就是订单系统调用库存系统的接口:

缺点:

  • 当库存系统出现故障时,订单就会失败。
  • 订单系统和库存系统高耦合。

引入消息队列:

订单系统:用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。

库存系统:订阅下单的消息,获取下单消息,进行库存操作。就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失。

2.2 秒杀活动

流量削峰一般在秒杀活动中应用广泛。

场景:秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。

作用:

  1. 可以控制活动人数,超过此一定阀值的订单直接丢弃(为什么秒杀你抢不到,知道了吧);
  2. 可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单);
  3. 用户的请求,服务器接收到之后,写入消息队列,超过定义的阈值就直接丢弃请求,或者跳转错误页面;
  4. 业务系统取出队列中的消息,再做后续处理。

三、RabbitMQ安装

######### rabbitmq-server服务端 #########

# 下载centos源
wget -O /etc/yum.repos.d/CentOS-Base.repo http://mirrors.cloud.tencent.com/repo/centos7_base.repo
# 下载epel源
wget -O /etc/yum.repos.d/epel.repo http://mirrors.cloud.tencent.com/repo/epel-7.repo
# 清空yum缓存并且生成新的yum缓存
yum clean all
yum makecache
# 安装erlang
yum -y install erlang
# 安装RabbitMQ
yum -y install rabbitmq-server
# 启动、停止...(无用户名密码):
systemctl start/stop/restart/status rabbitmq-server

rabbitmq-server启动报错:Failed to start RabbitMQ broker

主机hostname配置错误,原本hostname为pd.com,更改为pd即可(每个人不同)。

# 修改hosts文件中hostname
hostnamectl set-hostname pd
# 重启rabbitmq-server
systemctl restart rabbitmq-server

解决啦!

RabbitMQ账号密码、角色权限设置

######### 设置RabbitMQ账号密码,以及角色权限设置 #########

# 设置新用户pd 密码123456
rabbitmqctl add_user pd 123456

# 设置用户为administrator角色
rabbitmqctl set_user_tags pd administrator

# 设置权限,允许对所有的队列都有权限
# 对何种资源具有配置、写、读的权限通过正则表达式来匹配,具体命令格式如下:
set_permissions [-p <vhostpath>] <user> <conf> <write> <read>

rabbitmqctl set_permissions -p "/" pd ".*" ".*" ".*"

# 重启服务生效设置
service rabbitmq-server start/stop/restart

RabbitMQ相关命令

# 新建用户
rabbitmqctl add_user {用户名} {密码}
​
# 设置权限
rabbitmqctl set_user_tags {用户名} {权限}
​
# 查看用户列表
rabbitmqctl list_users
​
# 为用户授权
添加 Virtual Hosts :    
rabbitmqctl add_vhost <vhost># 删除用户
rabbitmqctl delete_user 用户名
​
# 修改用户的密码
rabbitmqctl change_password 用户名 新密码
    
# 删除 Virtual Hosts  
rabbitmqctl delete_vhost <vhost># 使用户user1具有vhost1这个virtual host中所有资源的配置、写、读权限以便管理其中的资源
rabbitmqctl set_permissions -p vhost1 user1 '.*' '.*' '.*'# 查看权限
rabbitmqctl list_user_permissions user1
rabbitmqctl list_permissions -p vhost1
​
# 清除权限
rabbitmqctl clear_permissions [-p VHostPath] User

# 清空队列步骤
关闭应用    rabbitmqctl stop_app
启动应用    rabbitmqctl start_app
清空队列    rabbitmqctl reset
​
# 查看xx
查看所有的exchange                           rabbitmqctl list_exchanges
查看所有的queue                              rabbitmqctl list_queues
查看所有的user                               rabbitmqctl list_users
查看所有的绑定(exchange和queue的绑定信息)        rabbitmqctl list_bindings
查看消息确认信息                                rabbitmqctl list_queues name messages_ready messages_unacknowledged
查看RabbitMQ状态,包括版本号等信息                rabbitmqctl status

# 开启web界面RabbitMQ
rabbitmq-plugins enable rabbitmq_management

# 访问web界面
http://服务器名:15672/
http://127.0.0.1:15672/

四、RabbitMQ组件说明

AMQP:AMQP协议是一个高级抽象层消息通信协议,RabbitMQ是AMQP协议的实现,它主要包括以下组件:

# Server(broker)
接受客户端连接,实现AMQP消息队列和路由功能的进程。

# Virtual Host
其实是一个虚拟概念,类似于权限控制组,一个Virtual Host里面可以有若干个Exchange和Queue,但是权限控制的最小粒度是Virtual Host。

# Exchange
接受生产者发送的消息,并根据Binding规则将消息路由给服务器中的队列。ExchangeType决定了Exchange路由消息的行为,例如,在RabbitMQ中,ExchangeType有direct、Fanout和Topic三种,不同类型的Exchange路由的行为是不一样的。

# Message Queue
消息队列,用于存储还未被消费者消费的消息。

# Message
由Header和Body组成,Header是由生产者添加的各种属性的集合,包括Message是否被持久化、由哪个Message Queue接受、优先级是多少等。而Body是真正需要传输的APP数据。

# Binding
Binding联系了Exchange与Message Queue。Exchange在与多个Message Queue发生Binding后会生成一张路由表,路由表中存储着Message Queue所需消息的限制条件即Binding Key。当Exchange收到Message时会解析其Header得到Routing Key,Exchange根据Routing Key与Exchange Type将Message路由到Message Queue。Binding Key由Consumer在Binding Exchange与Message Queue时指定,而Routing Key由Producer发送Message时指定,两者的匹配方式由Exchange Type决定。 

# Connection
连接,对于RabbitMQ而言,其实就是一个位于客户端和Broker之间的TCP连接。

# Channel
信道,仅仅创建了客户端到Broker之间的连接后,客户端还是不能发送消息的。需要为每一个Connection创建Channel,AMQP协议规定只有通过Channel才能执行AMQP的命令。一个Connection可以包含多个Channel。之所以需要Channel,是因为TCP连接的建立和释放都是十分昂贵的,如果一个客户端每一个线程都需要与Broker交互,如果每一个线程都建立一个TCP连接,暂且不考虑TCP连接是否浪费,就算操作系统也无法承受每秒建立如此多的TCP连接。RabbitMQ建议客户端线程之间不要共用Channel,至少要保证共用Channel的线程发送消息必须是串行的,但是建议尽量共用Connection。

# Command
AMQP的命令,客户端通过Command完成与AMQP服务器的交互来实现自身的逻辑。例如在RabbitMQ中,客户端可以通过publish命令发送消息,txSelect开启一个事务,txCommit提交一个事务。