RabbitMQ消息中间件的用法

1.什么是RabbitMQ

RabbitMQ是一个由erlang开发的AMQP(Advanced Message Queue )的开源实现。AMQP 的出现其实也是应了广大人民群众的需求,虽然在同步消息通信的世界里有不少公开标准(如 COBAR的 IIOP ,或者是 SOAP 等),可是在异步消息处理中却不是这样,只有大企业有一些商业实现(如微软的 MSMQ ,IBM 的 Websphere MQ 等),所以,在 2006 年的 6 月,Cisco 、Redhat、iMatix 等联合制定了 AMQP 的公开标准。 RabbitMQ是由RabbitMQ Technologies Ltd开发而且提供商业支持的。该公司在2010年4月被SpringSource(VMWare的一个部门)收购。在2013年5月被并入Pivotal。其实VMWare,Pivotal和EMC本质上是一家的。不一样的是VMWare是独立上市子公司,而Pivotal是整合了EMC的某些资源,如今并无上市。 RabbitMQ的官网是http://www.rabbitmq.com 百度百科amqp协议介绍https://baike.baidu.com/item/AMQP/8354716?fr=aladdin 注意:RabbitMQ是采用erlang语言开发的,因此必须有erlang环境才能够运行html

2.为何要使用MQ

 

3.经常使用消息中间件的对比

4.消息队列RabbitMq的五种形式队列

4.1.点对点(简单)的队列

点对点模式:一对一消费,一个生产者投递消息给队列,只能容许有一个消费者进行消费。spring

注意:若是消费集群的话,会进行均摊消费。前提是服务器的配置相同。缓存

均摊消费的弊端:假若有2台服务器分别为A、B。若是每一个消费处理消息的业务时间不相同的状况下,可能对消费者处理慢的服务器不公平(服务器压力大),A处理比B处理时间快,应该A处理的消息多一些,B处理的消息少一些才合理。springboot

队列以先进先出原则进行存放消息集合。生产者投递消息到队列中。服务器

当消费者启动的时候,会与队列服务器创建长链接,当生产者有消息投递到队列的时候,队列会马上将消息通知给消费者进行消费。网络

长链接的好处:若是是短连接的话,每次访问都须要创建链接,比较占内存。创建长链接会减小三次握手,提升传输速度。异步

取消息队列与推送消息队列的区别:

取消息:生产者投递消息到队列中,队列服务器缓存消息。这时候当消费者启动的时候,消费者会去向队列服务器中获取消息。学习

推消息:当生产者和消费者都启动的时候,生产者向队列投递消息,这时候队列会将消息推送给消费者。fetch

4.2.工做(公平性)队列模式

公平队列的原理:队列服务器向消费者发送消息的时候,消费者采用手动应答模式,队列服务器必需要收到消费者发送ack结果通知以后,才会继续发送下一个消息。ui

4.3.发布订阅模式

Direct exchange(直连交换机)是根据消息携带的路由键(routing key)将消息投递给对应队列的。

发布订阅实现流程:生产者投递消息给交换机,交换机根据路由策略(routignKey)转发到不一样的队列服务器中缓存,而后队列服务器在推送消息给消费者进行消费或者消费者从队列服务器中拉取消息进行消费。

发布订阅实现原理:一对多。

这个队列模式是消息队列中最重要的队列了,其余的都是在它的基础上进行了扩展。 功能实现:一个生产者发送消息,多个消费者获取消息(一样的消息),包括一个生产者,一个交换机,多个队列,多个消费者。

思路解读(重点理解): 

1. 一个生产者,多个消费者

2. 每个消费者都有本身的一个队列

3. 生产者没有直接发消息到队列中,而是发送到交换机 

4. 每一个消费者的队列都绑定到交换机上

5. 消息经过交换机到达每一个消费者的队列 该模式就是Fanout Exchange(扇型交换机)将消息路由给绑定到它身上的全部队列 以用户发邮件案例讲解

注意:交换机没有存储消息功能,若是消息发送没有绑定消费队列的交换机,消息则丢失。在消费者没有启动的状况下,生产者投递消息到交换机,这时候交换机不知道把消息转发给哪一个消费者,因此消息会消失。由于交换机没有缓存功能,只作转发的功能。

使用场景:用户注册→发送邮件→发送短信。

 

4.4.路由模式RoutingKey

Direct exchange(直连交换机)是根据消息携带的路由键(routing key)将消息投递给对应队列的。

生产者发送消息到交换机并指定一个路由key,消费者队列绑定到交换机时要制定路由key(key匹配就能接受消息,key不匹配就不能接受消息)。

例如:咱们能够把路由key设置为insert ,那么消费者队列key指定包含insert才能够接收消息,消费者队列key定义为update或者delete就不能接收消息。很好的控制了更新,插入和删除的操做。 采用交换机direct模式

流程说明:若是生产者投递消息到交换机(exchange),邮件队列和短信队列也都绑定了交换机(exchange)。可是当交换机的类型(type=direct)的时候,交换机的转发(路由)由routingKey决定转发给谁。以下如图所示,当交换机的rontingKey=email的时候,消息将转发到邮件队列服务而后由邮件消费者进行消费。而短信队列是都收不到消息的,由于短信的路由routingKey=msg。若是短信队列也想收到消息就须要修改routingKey=email才能够收到消息。

这就是交换机类型type=direct的用法及特性。

 

4.5.通配符模式Topics

说明:此模式实在路由key模式的基础上,使用了通配符来管理消费者接收消息。生产者P发送消息到交换机X,type=topic,交换机根据绑定队列的routing key的值进行通配符匹配;

符号#匹配一个或者多个词lazy.# 能够匹配lazy.irs或者lazy.irs.cor

符号*只能匹配一个词lazy.* 能够匹配lazy.irs或者lazy.cor

 

 

消息队列RabbitMQ应答模式

为了确保消息不会丢失,RabbitMQ支持消息应答。消费者发送一个消息应答,告诉RabbitMQ这个消息已经接收而且处理完毕了。RabbitMQ就能够删除它了。 若是一个消费者挂掉却没有发送应答,RabbitMQ会理解为这个消息没有处理彻底,而后交给另外一个消费者去从新处理。这样,你就能够确认即便消费者偶尔挂掉也不会丢失任何消息了。 没有任何消息超时限制;只有当消费者挂掉时,RabbitMQ才会从新投递。即便处理一条消息会花费很长的时间。 消息应答是默认打开的。咱们经过显示的设置autoAsk=true关闭这种机制。现即自动应答开,一旦咱们完成任务,消费者会自动发送应答。通知RabbitMQ消息已被处理,能够从内存删除。若是消费者因宕机或连接失败等缘由没有发送ACK(不一样于ActiveMQ,在RabbitMQ里,消息没有过时的概念),则RabbitMQ会将消息从新发送给其余监听在队列的下一个消费者。

RabbitMQ的公平转发

目前消息转发机制是平均分配,这样就会出现俩个消费者,奇数的任务很耗时,偶数的任何工做量很小,形成的缘由就是近当消息到达队列进行转发消息。并不在意有多少任务消费者并未传递一个应答给RabbitMQ。仅仅盲目转发全部的奇数给一个消费者,偶数给另外一个消费者。 为了解决这样的问题,咱们可使用basicQos方法,传递参数为prefetchCount= 1。这样告诉RabbitMQ不要在同一时间给一个消费者超过一条消息。 换句话说,只有在消费者空闲的时候会发送下一条信息。调度分发消息的方式,也就是告诉RabbitMQ每次只给消费者处理一条消息,也就是等待消费者处理完毕并本身对刚刚处理的消息进行确认以后,才发送下一条消息,防止消费者太过于忙碌,也防止它太过去悠闲。 经过 设置channel.basicQos(1);

消息队列RabbitMQ应答模式

案例: 生产者端代码不变,消费者端代码这部分就是用于开启手动应答模式的。 channel.basicConsume(QUEUE_NAME, false, defaultConsumer); 注:第二个参数值为false表明关闭RabbitMQ的自动应答机制,改成手动应答。 在处理完消息时,返回应答状态,true表示为自动应答模式。 channel.basicAck(envelope.getDeliveryTag(), false);

传统简单队列是如何实现的?

生产者生产消息直接投递给队列服务器,队列服务器在以推送消息到消费者或者消费者从队列服务器拉取消息进行消费。消费者启动的时候会与队列服务器创建长链接。

RabbitMQ关键名词

AMQP(高级消息队列协议)是一个异步消息传递所使用应用层协议规范,为面向消息中间件设计,基于此协议的客户端与消息中间件能够无视消息来源传递消息,不受客户端、消息中间件、不一样的开发语言环境等条件的限制;

涉及概念解释: 

 Server(Broker):接收客户端链接,实现AMQP协议的消息队列和路由功能的进程;

 Virtual Host:虚拟主机的概念,相似权限控制组,一个Virtual Host里能够有多个Exchange和Queue。     

Exchange:交换机,接收生产者发送的消息,并根据Routing Key将消息路由到服务器中的队列Queue。

 ExchangeType:交换机类型决定了路由消息行为,RabbitMQ中有三种类型Exchange,分别是fanout、direct、topic;  Message Queue:消息队列,用于存储还未被消费者消费的消息;

 Message:由Header和body组成,Header是由生产者添加的各类属性的集合,包括Message是否被持久化、优先级是多少、由哪一个Message Queue接收等;

body是真正须要发送的数据内容;

BindingKey:绑定关键字,将一个特定的Exchange和一个特定的Queue绑定起来。

 

RabbitMQ交换机的做用

生产者发送消息不会像传统方式直接将消息投递到队列中,而是先将消息投递到交换机中,在由交换机转发到具体的队列,队列在将消息以推送或者拉取方式给消费者进行消费,这和咱们以前学习Nginx有点相似。 交换机的做用根据具体的路由策略分发到不一样的队列中。

交换机有四种类型:

Direct exchange(直连交换机):是根据消息携带的路由键;

routing key:将消息投递给对应队列的 Fanout exchange(扇型交换机)将消息路由给绑定到它身上的全部队列 ;

Topic exchange(主题交换机):队列经过路由键绑定到交换机上,而后,交换机根据消息里的路由值,将消息路由给一个或多个绑定队列;

Headers exchange(头交换机):相似主题交换机,可是头交换机使用多个消息属性来代替路由键创建路由规则。经过判断消息头的值可否与指定的绑定相匹配来确立路由规则。

 

RabbitMQ消息确认机制

问题产生背景: 生产者发送消息出去以后,不知道到底有没有发送到RabbitMQ服务器, 默认是不知道的。并且有的时候咱们在发送消息以后,后面的逻辑出问题了,咱们不想要发送以前的消息了,须要撤回该怎么作。

若是RabbitMQ服务器宕机了,消息会丢失吗?

  答案:RabbitMQ服务器支持消息持久化机制,会把消息持久化在硬盘上保存。代码设置  channel.queueDeclare(EMAIL_QUEUE, true, false, false, null); 方法第二个参数,默认状况下咱们应该设置为true。

解决方案:

1.AMQP 事务机制

2.Confirm 模式

事务模式::

  txSelect:将当前channel设置为transaction模式

  txCommit :提交当前事务

  txRollback:事务回滚

 

 生产者   消费者   队列服务器  

消费者如何确保消息必定可以消费成功?

经过应答模式,默认为应答模式,能够修改成手动应答。设置方法:channel.basicConsume(QUEUE_NAME, false, defaultConsumer); 第二个参数。

设置应答模式 :第一个参数 队列名称、第二个参数 应答模式 若是为true 自动应答,false 为手动应答、第三个参数 监听器
自动应答(true):不在意消费者对这个消息处理是否成功,都会告诉队列删除该消息。若是处理消息失败的状况下,应该实现自动补偿。
手动应答(false):当队列把消息推送给消费者,消费者处理完业务逻辑以后,手动返回ack(通知)告诉给队列服务器是否要删除该消息、若是失败,队列服务器作补偿,而不会直接删除该消息、

 

springboot整合rabbitmq项目

springboot整合rabbitmq分为2个项目,一个是生产者服务,一个是消息服务平台项目。消息服务平台项目中包括邮件消费者和短信消费者。没有必要每个消费者都建立一个项目,那样会浪费资源。

在一个项目中,能够有多个生产者和消费者。

 

RabbitMQ消息重试机制

消费者在消费消息的时候,若是消费者业务逻辑出现程序异常,这时候应该如何处理?

答案:使用消息重试机制。

如何合适选择重试机制:

状况1: 消费者获取到消息后,调用第三方接口,但接口暂时没法访问,是否须要重试?

答案:须要重试机制。

状况2: 消费者获取到消息后,抛出数据转换异常,是否须要重试? 

答案:不须要重试机制,须要发布版本进行解决。

如何实现重试机制

总结:对于状况2,若是消费者代码抛出异常是须要发布新版本才能解决的问题,那么不须要重试,重试也无济于事。应该采用日志记录+定时任务job健康检查+人工进行补偿。

消费者若是保证消息幂等性,不被重复消费

产生缘由:网络延迟传输中,消费出现异常或者是消费延迟消费,会形成MQ进行重试补偿,在重试过程当中,可能会形成重复消费。

消费者如何保证消息幂等性,不被重复消费

解决办法:

①使用全局MessageID判断消费方是不是同一个,解决幂等性。

②或者使用业务逻辑id保证惟一(好比订单号码)

RabbitMQ信队列

死信队列 听上去像 消息“死”了     其实也有点这个意思,死信队列  是 当消息在一个队列 由于下列缘由:

消息被拒绝(basic.reject/ basic.nack)而且再也不从新投递 requeue=false

消息超期 (rabbitmq  Time-To-Live -> messageProperties.setExpiration())

队列超载

变成了 “死信” 后    被从新投递(publish)到另外一个Exchange   该Exchange 就是DLX     而后该Exchange 根据绑定规则 转发到对应的 队列上  监听该队列  就能够从新消费     说白了 就是  没有被消费的消息  换个地方从新被消费

生产者   -->  消息 --> 交换机  --> 队列  --> 变成死信  --> DLX交换机 -->队列 --> 消费者

什么是死信呢?什么样的消息会变成死信呢?

消息被拒绝(basic.reject或basic.nack)而且requeue=false.

消息TTL过时

队列达到最大长度(队列满了,没法再添加数据到mq中)

应用场景分析

在定义业务队列的时候,能够考虑指定一个死信交换机,并绑定一个死信队列,当消息变成死信时,该消息就会被发送到该死信队列上,这样就方便咱们查看消息失败的缘由了

channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false); 丢弃消息

如何使用死信交换机呢?

定义业务(普通)队列的时候指定参数

x-dead-letter-exchange: 用来设置死信后发送的交换机

x-dead-letter-routing-key:用来设置死信的routingKey

/**
* 定义死信队列相关信息
*/
public final static String deadQueueName = "dead_queue";
public final static String deadRoutingKey = "dead_routing_key";
public final static String deadExchangeName = "dead_exchange";
/**
* 死信队列 交换机标识符
*/
public static final String DEAD_LETTER_QUEUE_KEY = "x-dead-letter-exchange";
/**
* 死信队列交换机绑定键标识符
*/
public static final String DEAD_LETTER_ROUTING_KEY = "x-dead-letter-routing-key";

/**
* 定义短信队列 包括死信队列
*
* @return
*/
@Bean
public Queue fanoutMsgQueue() {
//return new Queue(MSG_QUEUE_FANOUT);
// 将普通队列绑定到死信队列交换机上
Map<String, Object> args = new HashMap<>(2);
args.put(DEAD_LETTER_QUEUE_KEY, deadExchangeName);
args.put(DEAD_LETTER_ROUTING_KEY, deadRoutingKey);
Queue queue = new Queue(MSG_QUEUE_FANOUT, true, false, false, args);
return queue;
}
/**
* 配置死信队列
*
* @return
*/
@Bean
public Queue deadQueue() {
Queue queue = new Queue(deadQueueName, true);
return queue;
}

/**
* 建立死信交换机
*/
@Bean
public DirectExchange deadExchange() {
return new DirectExchange(deadExchangeName);
}

/**
* 死信交换机绑定私信队列
* @param deadQueue
* @param deadExchange
* @return
*/
@Bean
public Binding bindingDeadExchange(Queue deadQueue, DirectExchange deadExchange) {
return BindingBuilder.bind(deadQueue).to(deadExchange).with(deadRoutingKey);
}

RabbitMq 的配置文件

spring:
rabbitmq:
#### 链接地址
host: 127.0.0.1
####端口号
port: 5672
#### 用户名 本身在rabbitmq服务器上新建的 默认的用户名和密码为guest
username: ming
#### 密码
password: ming
### 虚拟主机
virtual-host: /member
listener:
simple:
retry:
####开启消费者(程序出现异常的状况下)进行重试机制
enabled: true
### 最大重试次数, 默认状况下 一直重试
max-attempts: 5
#### 重试间隔时间 单位:毫秒
initial-interval: 3000
##### 开启手动应答 ack
acknowledge-mode: manual

### 服务端口号
server:
port: 8081

rabbitmq地址:http://www.rabbitmq.com/getstarted.html

相关文章
相关标签/搜索