RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现。AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消息通信的世界里有不少公开标准(如 COBAR的 IIOP ,或者是 SOAP 等),可是在异步消息处理中却不是这样,只有大企业有一些商业实现(如微软的 MSMQ ,IBM 的 Websphere MQ 等),所以,在 2006 年的 6 月,Cisco 、Redhat、iMatix 等联合制定了 AMQP 的公开标准。
RabbitMQ是由RabbitMQ Technologies Ltd开发而且提供商业支持的。该公司在2010年4月被SpringSource(VMWare的一个部门)收购。在2013年5月被并入Pivotal。其实VMWare,Pivotal和EMC本质上是一家的。不一样的是VMWare是独立上市子公司,而Pivotal是整合了EMC的某些资源,如今并无上市。
RabbitMQ的官网是http://www.rabbitmq.com
百度百科amqp协议介绍https://baike.baidu.com/item/AMQP/8354716?fr=aladdin
注意:RabbitMQ是采用erlang语言开发的,因此必须有Erlang环境才能够运行html
Erlang (高并发应用) 数据库
Erlang编程语言最初目的是进行大型电信交换设备的软件开发,是一种适用于大规模并行处理环境的高可靠性编程语言。随着多核处理器技术的日渐普及,以及互联网、云计算等技术的发展,该语言的应用范围也有逐渐扩大之势。编程
百度百科介绍:https://baike.baidu.com/item/Erlang%E8%AF%AD%E8%A8%80/20864044?fr=aladdin缓存
比较图示: 初衷理念实现抗高并发语言服务器
不一样的项目 不一样的 路径,独立的virtualhost,相互进行隔离: 客户端链接时候须要指定virtual host地址 。并发
更加解耦 相互进行隔离 相似于每一个项目都有不一样的数据库同样。异步
添加virtual host编程语言
指定某个用户的 Virtual Host分布式
AMQP(高级消息队列协议)是一个异步消息传递所使用应用层协议规范,为面向消息中间件设计,基于此协议的客户端与消息中间件能够无视消息来源传递消息,不受客户端、消息中间件、不一样的开发语言环境等条件的限制;高并发
涉及概念解释:
Server(Broker):接收客户端链接,实现AMQP协议的消息队列和路由功能的进程;
Virtual Host:虚拟主机的概念,相似权限控制组,一个Virtual Host里能够有多个Exchange和Queue。
Exchange:交换机,接收生产者发送的消息,并根据Routing Key将消息路由到服务器中的队列Queue。
ExchangeType:交换机类型决定了路由消息行为,RabbitMQ中有三种类型Exchange,分别是fanout、direct、topic;
Message Queue:消息队列,用于存储还未被消费者消费的消息;
Message:由Header和body组成,Header是由生产者添加的各类属性的集合,包括Message是否被持久化、优先级是多少、由哪一个Message Queue接收等;body是真正须要发送的数据内容;
BindingKey:绑定关键字,将一个特定的Exchange和一个特定的Queue绑定起来。
RabbitMQ点对点模式:
一、点对点模式 一对一模式。 一个生产者投递消息给队列 只能容许有一个消费者进行消费 若是集群的话 会进行均摊消费 服务器配置不同 均摊就不优了
长链接 不用三次握手之类的 提升传输效率 可是长链接占服务器带宽
推: 消费者已经启动了,创建长链接,一旦生产者向队列投递消息会立马推送给消费者
取: 生产者先投递消息队列进行缓存,这时候消费者再次启动时候 ,就会向队列获取消息。
点对点模式代码: RabbitMQ整合Spring Booot点对点模式
1.了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收而且处理完毕了。RabbitMQ就能够删除它了。
2. 若是一个消费者挂掉却没有发送应答,RabbitMQ会理解为这个消息没有处理彻底,而后交给另外一个消费者去从新处理。这样,你就能够确认即便消费者偶尔挂掉也不会丢失任何消息了。
3. 没有任何消息超时限制;只有当消费者挂掉时,RabbitMQ才会从新投递。即便处理一条消息会花费很长的时间。
4. 消息应答是默认打开的。咱们经过显示的设置autoAsk=true关闭这种机制。现即自动应答开,一旦咱们完成任务,消费者会自动发送应答。通知RabbitMQ消息已被处理,能够从内存删除。若是消费者因宕机或连接失败等缘由没有发送ACK(不一样于ActiveMQ,在RabbitMQ里,消息没有过时的概 念),则RabbitMQ会将消息从新发送给其余监听在队列的下一个消费者。
应答模式:
自动应答: 不在意消费者对消息处理是否成功,都会告诉队列删除消息。若是处理消息失败,实现自动补偿(队列投递过去 从新处理)。
手动应答: 消费者处理完业务逻辑,手动返回ack(通知)告诉队列处理完了,队列进而删除消息。
实现思路:
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
注:第二个参数值为false表明关闭RabbitMQ的自动应答机制,改成手动应答。
channel.basicAck(envelope.getDeliveryTag(), false);
自动应答: 不在意消费者对消息处理是否成功,都会告诉队列删除消息。若是处理消息失败,实现自动补偿(队列投递过去 从新处理)。
手动应答: 消费者处理完业务逻辑,手动返回ack(通知)告诉队列处理完了,队列进而删除消息。
RabbitMQ整合Spring Booot【消费者应答模式】
总结:
只有在 消费者空闲的时候会发送下一条信息。调度分发消息的方式,也就是告诉RabbitMQ每次只给消费者处理一条消息,也就是等待消费者处理完毕并本身对刚刚处理的消息进行确认以后,才发送下一条消息,防止消费者太过于忙碌,也防止它太过去悠闲。
公平队列原理:队列服务器向消费者发送消息的时候,消费者采用手动应答模式,队列服务器必需要收到消费者发送ack结果通知,才会发送下一个消息。(快的处理的多,消费的多)
使用背景:
经过 设置channel.basicQos(1); 开发: RabbitMQ整合Spring Booot【公平队列】
关于RabbitMQ死信队列
变成了 “死信” 后 ,被从新投递(publish)到另外一个Exchange ,该Exchange 就是DLX 。
而后该Exchange 根据绑定规则,转发到对应的 队列上, 监听该队列 ,就能够从新消费。
说白了 就是 没有被消费的消息 ,换个地方从新被消费
生产者 --> 消息 --> 交换机 --> 队列 --> 变成死信 --> DLX交换机 -->队列 --> 消费者
应用场景分析:
在定义业务队列的时候,能够考虑指定一个死信交换机,并绑定一个死信队列,当消息变成死信时,该消息就会被发送到该死信队列上,这样就方便咱们查看消息失败的缘由了
死信队列 听上去像 消息“死”了 ,其实也有点这个意思,
死信队列 是 当消息在一个队列 由于下列缘由:
1.消息被拒绝(basic.reject或basic.nack)而且requeue=false.
2.消息TTL过时
3.队列达到最大长度(队列满了,没法再添加数据到mq中)
应用场景分析:
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); 丢弃消息
如何使用死信交换机呢?
定义业务(普通)队列的时候指定参数
1. 邮件队列 绑定一个死信交换机 ,一旦邮件队列满了的状况下 ,为了防止数据丢失状况 ,消息再也不邮件队列存放了,放到死信交换机。
2.而后交给私信邮件队列。
3.最终交给死信消费者。
步骤:
这个多是消息队列中最重要的队列了,其余的都是在它的基础上进行了扩展。
关于交换机:
交换机有四种类型:
过程:
一个生产者发送消息 ----> 到交换机 ----> 到队列(每一个队列绑定到交换机上) ----> 到消费者(每一个消费者有本身的队列)
功能实现:一个生产者发送消息,多个消费者获取消息(一样的消息), 包括:
场景:
例如:
咱们能够把路由key设置为insert ,那么消费者队列key指定包含insert才能够接收消息,消费者队列key定义为update或者delete就不能接收消息。很好的控制了更新,插入和删除的操做。
注:
前面作的demo中RoutingKey设置的空。
代码实现:
RoutingKey有值的时候,那么 通过消息队列以后,须要在通过RoutingKey进行判断决定消费者。
RabbitMQ整合Spring Booot【Exchange-路由key模式】
- 符号#:匹配一个或者多个词lazy.# 能够匹配lazy.irs或者lazy.irs.cor。
- 符号*:只能匹配一个词lazy.* 能够匹配lazy.irs或者lazy.cor。
关于 通配符 “,” 和 “#” 的使用: RabbitMQ整合Spring Booot【Exchange-Topics模式】
(3)Fanout Exchange:
RabbitMQ发布与订阅原理 Exchange Fanout模式:
案例: 用户注册 ---> 发送邮件 --->发送短信
RabbitMQ整合Spring Booot【Exchange-Fanout模式】
背景:
生产者发送消息出去以后,不知道到底有没有发送到RabbitMQ服务器, 默认是不知道的。并且有的时候咱们在发送消息以后,后面的逻辑出问题了,咱们不想要发送以前的消息了,须要撤回该怎么作?
解决方案:
AMQP 事务机制
事务模式:
注: RabbitMQ支持消息持久化机制,把消息持久化到硬盘上。若是RabbitMQ服务器宕机了,消息不会丢失。
RabbitMQ整合Spring Booot【生产者事务确认机制】
方案二:Confirm 模式
队列和消费者创建长链接,推送或者拉取形式。
消费者经过自动应答或者手动应答,队列服务器等待应答结果,若是没有应答结果那么保留给下一个消费者。
消费者运行报错时候,会进行重试。
RabbitMQ整合Spring Booot【消费者补偿幂等问题】
1.点对点(简单)的队列2.工做(公平性)队列模式3.发布订阅模式4.路由模式Routing(Direct)5.通配符模式Topics