RabbitMQ的开发应用

1.介绍

RabbitMQ 是一个由erlang语言编写的、开源的、在AMQP基础上完整的、可复用的企业消息系统。支持多种语言,包括java、Python、ruby、PHP、C/C++等。java

1.1.AMQP模型

AMQP:advanced message queuing protocol ,一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息并不受客户端/中间件不一样产品、不一样开发语言等条件的限制。算法

AMQP模型图
amqp模型.jpgspring

1.1.1.工做过程

发布者(Publisher)发布消息(Message),经由交换机(Exchange)。json

交换机根据路由规则将收到的消息分发给与该交换机绑定的队列(Queue)。缓存

最后 AMQP 代理会将消息投递给订阅了此队列的消费者,或者消费者按照需求自行获取。安全

一、发布者、交换机、队列、消费者均可以有多个。同时由于 AMQP 是一个网络协议,因此这个过程当中的发布者,消费者,消息代理 能够分别存在于不一样的设备上。springboot

二、发布者发布消息时能够给消息指定各类消息属性(Message Meta-data)。有些属性有可能会被消息代理(Brokers)使用,然而其余的属性则是彻底不透明的,它们只能被接收消息的应用所使用。ruby

三、从安全角度考虑,网络是不可靠的,又或是消费者在处理消息的过程当中意外挂掉,这样没有处理成功的消息就会丢失。基于此缘由,AMQP 模块包含了一个消息确认(Message Acknowledgements)机制:当一个消息从队列中投递给消费者后,不会当即从队列中删除,直到它收到来自消费者的确认回执(Acknowledgement)后,才彻底从队列中删除。服务器

四、在某些状况下,例如当一个消息没法被成功路由时(没法从交换机分发到队列),消息或许会被返回给发布者并被丢弃。或者,若是消息代理执行了延期操做,消息会被放入一个所谓的死信队列中。此时,消息发布者能够选择某些参数来处理这些特殊状况。网络

1.1.2.Exchange交换机

交换机是用来发送消息的 AMQP 实体。交换机拿到一个消息以后将它路由给一个或零个队列。它使用哪一种路由算法是由交换机类型和绑定(Bindings)规则所决定的。常见的交换机有以下几种:

  1. direct 直连交换机:Routing Key==Binding Key,严格匹配。
  2. fanout 扇形交换机:把发送到该 Exchange 的消息路由到全部与它绑定的 Queue 中。
  3. topic 主题交换机:Routing Key==Binding Key,模糊匹配。
  4. headers 头交换机:根据发送的消息内容中的 headers 属性进行匹配。
    具体有关这五种交换机的说明和用法,后续会有章节详细介绍。

1.1.3.Queue队列

AMQP 中的队列(queue)跟其余消息队列或任务队列中的队列是很类似的:它们存储着即将被应用消费掉的消息。队列跟交换机共享某些属性,可是队列也有一些另外的属性。

  • Durable(消息代理重启后,队列依旧存在)
  • Exclusive(只被一个链接(connection)使用,并且当链接关闭后队列即被删除)
  • Auto-delete(当最后一个消费者退订后即被删除)
  • Arguments(一些消息代理用他来完成相似与 TTL 的某些额外功能)

1.2.rabbitmq和kafka对比

rabbitmq遵循AMQP协议,用在实时的对可靠性要求比较高的消息传递上。kafka主要用于处理活跃的流式数据,大数据量的数据处理上。主要体如今:

1.2.1.架构

  1. rabbitmq:RabbitMQ遵循AMQP协议RabbitMQ的broker由Exchange,Binding,queue组成,其中exchange和binding组成了消息的路由键;客户端Producer经过链接channel和server进行通讯,Consumer从queue获取消息进行消费(长链接,queue有消息会推送到consumer端,consumer循环从输入流读取数据)。rabbitMQ以broker为中心。
  2. kafka:kafka听从通常的MQ结构,producer,broker,consumer,以consumer为中心,消息的消费信息保存的客户端consumer上,consumer根据消费的点,从broker上批量pull数据。

1.2.2.消息确认

  1. rabbitmq:有消息确认机制。
  2. kafka:无消息确认机制。

1.2.3.吞吐量

  1. rabbitmq:rabbitMQ在吞吐量方面稍逊于kafka,他们的出发点不同,rabbitMQ支持对消息的可靠的传递,支持事务,不支持批量的操做;基于存储的可靠性的要求存储能够采用内存或者硬盘。
  2. kafka:kafka具备高的吞吐量,内部采用消息的批量处理,zero-copy机制,数据的存储和获取是本地磁盘顺序批量操做,具备O(1)的复杂度,消息处理的效率很高。
    (备注:kafka零拷贝,经过sendfile方式。(1)普通数据读取:磁盘->内核缓冲区(页缓存 PageCache)->用户缓冲区->内核缓冲区->网卡输出;(2)kafka的数据读取:磁盘->内核缓冲区(页缓存 PageCache)->网卡输出。

1.2.4.可用性

  1. rabbitmq(1)普通集群:在多台机器上启动多个rabbitmq实例,每一个机器启动一个。可是你建立的queue,只会放在一个rabbtimq实例上,可是每一个实例都同步queue的元数据。完了你消费的时候,实际上若是链接到了另一个实例,那么那个实例会从queue所在实例上拉取数据过来。(2)镜像集群:跟普通集群模式不同的是,你建立的queue,不管元数据仍是queue里的消息都会存在于多个实例上,而后每次你写消息到queue的时候,都会自动把消息到多个实例的queue里进行消息同步。这样的话,好处在于,一个机器宕机了,没事儿,别的机器均可以用。坏处在于,第一,这个性能开销太大了,消息同步全部机器,致使网络带宽压力和消耗很重。第二,这么玩儿,就没有扩展性可言了,若是某个queue负载很重,你加机器,新增的机器也包含了这个queue的全部数据,并无办法线性扩展你的queue
  2. kafka:kafka是由多个broker组成,每一个broker是一个节点;每建立一个topic,这个topic能够划分为多个partition,每一个partition能够存在于不一样的broker上,每一个partition就放一部分数据。这就是自然的分布式消息队列,就是说一个topic的数据,是分散放在多个机器上的,每一个机器就放一部分数据。每一个partition的数据都会同步到其余机器上,造成本身的多个replica副本,而后全部replica会选举一个leader出来,主从结构。

1.2.5.集群负载均衡

  1. rabbitmq:rabbitMQ的负载均衡须要单独的loadbalancer进行支持,如HAProxy和Keepalived等。
  2. kafka:kafka采用zookeeper对集群中的broker、consumer进行管理,能够注册topic到zookeeper上;经过zookeeper的协调机制,producer保存对应topic的broker信息,能够随机或者轮询发送到broker上;而且producer能够基于语义指定分片,消息发送到broker的某分片上。

2.结构

2.1.交换机模式

RabbitMQ经常使用的Exchange Type有fanout、direct、topic、headers这四种。

2.1.1.Direct Exchange

direct类型的Exchange路由规则很简单,它会把消息路由到那些binding key与routing key彻底匹配的Queue中。

2.1.2.Topic Exchange

前面讲到direct类型的Exchange路由规则是彻底匹配binding key与routing key,但这种严格的匹配方式在不少状况下不能知足实际业务需求。topic类型的Exchange与direct类型的Exchage类似,也是将消息路由到binding key与routing key相匹配的Queue中,但支持模糊匹配:

  • routing key为一个句点号“. ”分隔的字符串(咱们将被句点号“. ”分隔开的每一段独立的字符串称为一个单词),如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”
  • binding key与routing key同样也是句点号“. ”分隔的字符串
  • binding key中能够存在两种特殊字符"*"与“#”,用于作模糊匹配,其中" * "用于匹配一个单词,“#”用于匹配多个单词(能够是零个)

2.1.3.Fanout Exchange

fanout类型的Exchange路由规则很是简单,它会把全部发送到fanout Exchange的消息都会被转发到与该Exchange 绑定(Binding)的全部Queue上。
Fanout Exchange 不须要处理RouteKey 。只须要简单的将队列绑定到exchange 上。这样发送到exchange的消息都会被转发到与该交换机绑定的全部队列上。相似子网广播,每台子网内的主机都得到了一份复制的消息。因此,Fanout Exchange 转发消息是最快的。

2.1.4.Headers Exchange

headers类型的Exchange也不依赖于routing key与binding key的匹配规则来路由消息,而是根据发送的消息内容中的headers属性进行匹配。
在绑定Queue与Exchange时指定一组键值对;当消息发送到Exchange时,RabbitMQ会取到该消息的headers(也是一个键值对的形式),对比其中的键值对是否彻底匹配Queue与Exchange绑定时指定的键值对;若是彻底匹配则消息会路由到该Queue,不然不会路由到该Queue。

2.1.5.Default Exchange 默认

严格来讲,Default Exchange 并不该该和上面四个交换机在一块儿,由于它不属于独立的一种交换机类型,而是属于Direct Exchange 直连交换机。

默认交换机(default exchange)其实是一个由消息代理预先声明好的没有名字(名字为空字符串)的直连交换机(direct exchange)。

它有一个特殊的属性使得它对于简单应用特别有用处:那就是每一个新建队列(queue)都会自动绑定到默认交换机上,绑定的路由键(routing key)名称与队列名称相同。

举个例子:当你声明了一个名为 “search-indexing-online” 的队列,AMQP 代理会自动将其绑定到默认交换机上,绑定(binding)的路由键名称也是为 “search-indexing-online”。因此当你但愿将消息投递给“search-indexing-online”的队列时,指定投递信息包括:交换机名称为空字符串,路由键为“search-indexing-online”便可。

所以 direct exchange 中的 default exchange 用法,体现出了消息队列的 point to point,感受像是直接投递消息给指定名字的队列。

2.2.持久化

虽然咱们要避免系统宕机,可是这种“不可抗力”总会有可能发生。rabbitmq若是宕机了,再启动即是了,大不了有短暂时间不可用。但若是你启动起来后,发现这个rabbitmq服务器像是被重置了,之前的exchange,queue和message数据都没了,那就太使人崩溃了。不光业务系统由于无对应exchange和queue受影响,丢失的不少message数据更是致命的。因此如何保证rabbitmq的持久化,在服务使用前必须得考虑到位。

持久化能够提升RabbitMQ的可靠性,以防在异常状况(重启、关闭、宕机等)下的数据丢失。RabbitMQ的持久化分为三个部分:交换器的持久化、队列的持久化和消息的持久化。

2.2.1.exchange持久化

exchange交换器的持久化是在声明交换器的时候,将durable设置为true

若是交换器不设置持久化,那么在RabbitMQ交换器服务重启以后,相关的交换器信息会丢失,不过消息不会丢失,可是不能将消息发送到这个交换器

spring中建立exchange时,构造方法默认设置为持久化。

2.2.2.queue持久化

队列的持久化在声明队列的时候,将durable设置为true

若是队列不设置持久化,那么RabbitMQ交换器服务重启以后,相关的队列信息会丢失,同时队列中的消息也会丢失

exchange和queue,若是一个是非持久化,另外一个是持久化,中bind时会报错。

spring中建立exchange时,构造方法默认设置为持久化。

2.2.3.message持久化

要确保消息不会丢失,除了设置队列的持久化,还须要将消息设置为持久化。经过将消息的投递模式(BasicProperties中的deliveryMode属性)设置为2便可实现消息的持久化

  • 持久化的消息在到达队列时就被写入到磁盘,而且若是能够,持久化的消息也会在内存中保存一份备份,这样能够提升必定的性能,只有在内存吃紧的时候才会从内存中清除。
  • 非持久化的消息通常只保存在内存中,在内存吃紧的时候会被换入到磁盘中,以节省内存空间。

若是将全部的消息都进行持久化操做,这样会影响RabbitMQ的性能。写入磁盘的速度比写入内存的速度慢很,因此要在可靠性和吞吐量之间作权衡。

在spring中,BasicProperties中的deliveryMode属性,对应的是MessageProperties中的deliveryMode。平时使用的RabbitTemplate.convertAndSend()方法默认设置为持久化,deliveryMode=2。若是须要设置非持久化发送消息,须要手动设置:

messageProperties.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT);

2.2.4.完整方案

这里讲解实现消息持久化的完整方案。

1、exchange、queue、message

要保证消息的持久化,在rabbitmq自己的结构上须要实现下面这些:

  • exchange交换机的durable设置为true。
  • queue队列的durable设置为true。
  • message消息的投递模式deliveryMode设置为2。

2、发布确认
前面是保证了消息在投递到rabbitmq中,如何保证rabbit中消息的持久化。
那么还须要保证生产者能成功发布消息,如交换机名字写错了等等。能够在发布消息时设置投递成功的回调,肯定消息能成功投递到目标队列中。

3、接收确认
对于消费者来讲,若是在订阅消息的时候,将autoAck设置为true,那么消费者接收到消息后,尚未处理,就出现了异常挂掉了,此时,队列中已经将消息删除,消费者不可以在收到消息。

这种状况能够将autoAck设置为false,进行手动确认。

4、镜像队列集群
在持久化后的消息存入RabbitMQ以后,还须要一段时间才能存入磁盘。RabbitMQ并不会为每条消息都进行同步存盘,可能仅仅是保存到操做系统缓存之中而不是物理磁盘。若是在这段时间,服务器宕机或者重启,消息还没来得及保存到磁盘当中,从而丢失。对于这种状况,能够引入RabiitMQ镜像队列机制。

这里强调是镜像队列集群,而非普通集群。由于出于同步效率考虑,普通集群只会同步队列的元数据,而不会同步队列中的消息。只有升级成镜像队列集群后,才能也同步消息。

每一个镜像队列由一个master和一个或多个mirrors组成。主节点位于一个一般称为master的节点上。每一个队列都有本身的主节点。给定队列的全部操做首先应用于队列的主节点,而后传播到镜像。这包括队列发布(enqueueing publishes)、向消费者传递消息、跟踪消费者的确认等等。

发布到队列的消息将复制到全部镜像。无论消费者链接到哪一个节点,都会链接到master,镜像会删除在master上已确认的消息。所以,队列镜像提升了可用性,但不会在节点之间分配负载。 若是承载队列master的节点出现故障,则最旧的镜像将升级为新的master,只要它已同步。根据队列镜像参数,也能够升级未同步的镜像。

3.开发

java开发上,这里以spring-boot-starter-amqp为例,记录在springboot中使用rabbitmq的一些关注点。pom.xml中引用为:

<dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

3.1.简单示例

一个简单的示例,仅限于文本消息的发布和接收。

3.1.1.生产者

ProducerController.java
@RestController
public class ProducerController {
    private static final String HEADER_KEY_UID="uid";
    @Autowired
    private ProducerService producerService;

    @PostMapping("/sendText")
    public void sendText(@RequestParam("uid")String uid,@RequestParam("msg")String msg){
        MessageProperties messageProperties=new MessageProperties();
        messageProperties.setHeader(HEADER_KEY_UID,uid);
        producerService.sendText(msg,messageProperties);
    }
}
ProducerService.java
@Service
public class ProducerService {
    private static final String EXCHANGE_NAME="direct.exchange.a";
    private static final String ROUTING_KEY_NAME="direct.routingKey.a";
    @Resource
    private RabbitTemplate rabbitTemplate;


    /**
     * 发送 消息文本
     * @param data 文本消息
     * @param messageProperties 消息属性
     */
    public void sendText(String data, MessageProperties messageProperties) {
        Message message = rabbitTemplate.getMessageConverter().toMessage(data, messageProperties);
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY_NAME, message);
    }
}

消息发送的经常使用方法:

  • rabbitTemplate.send(message); //发消息,参数类型为org.springframework.amqp.core.Message
  • rabbitTemplate.convertAndSend(object); //转换并发送消息。 将参数对象转换为org.springframework.amqp.core.Message后发送
  • rabbitTemplate.convertSendAndReceive(message) //转换并发送消息,且等待消息者返回响应消息。

3.1.2.消费者

MessageListener.java
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MessageListener {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "direct.queue.d",
                    durable = "true"),
            exchange = @Exchange(value = "direct.exchange.a",
                    durable = "true",
                    type = ExchangeTypes.DIRECT,
                    ignoreDeclarationExceptions = "true"),
            key = "direct.routingKey.a"
    )
    )
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {
        MessageConverter messageConverter = rabbitTemplate.getMessageConverter();
        String msg = (String) messageConverter.fromMessage(message);
        log.info("消费端 Body: " + msg);
    }
}
  • @RabbitListener 能够标注在类上面,需配合 @RabbitHandler 注解一块儿使用
  • @RabbitListener 标注在类上面表示当有收到消息的时候,就交给 @RabbitHandler 的方法处理,具体使用哪一个方法处理,根据 MessageConverter 转换后的参数类型

3.2.消息序列化

rabbitmq中消息的序列化依赖于MessageConvert,这是一个接口,用于消息内容的序列化。

  • Message分为body和MessageProperties两部分。RabbitMQ的序列化是指Message的 body 属性,即咱们真正须要传输的内容,RabbitMQ 抽象出一个MessageConvert 接口处理消息的序列化,其实现有SimpleMessageConverter(默认)、Jackson2JsonMessageConverter等。
  • 当调用了 convertAndSend方法时,方法内部会使用MessageConvert进行消息的序列化。
  • MessageConvert是在RabbitTemplate中定义的属性,若是项目中须要使用多种MessageConvert。由于Spring中RabbitTemplate是单例模式注入,建议每种MessageConvert单独定义一种RabbitTemplate。

3.2.1.生产者

RabbitConfig.java
public class RabbitConfig {
    
    @Bean("jsonRabbitTemplate")
    public RabbitTemplate jsonRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    @Bean("defaultRabbitTemplate")
    public RabbitTemplate defaultRabbitTemplate(ConnectionFactory connectionFactory){
        RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory);
        return rabbitTemplate;
    }
}
ProducerService.java
@Service
public class ProducerService {
    private static final String EXCHANGE_NAME="direct.exchange.a";
    private static final String ROUTING_KEY_NAME="direct.routingKey.a";

    @Resource(name = "defaultRabbitTemplate")
    private RabbitTemplate defaultRabbitTemplate;
    @Resource(name = "jsonRabbitTemplate")
    private RabbitTemplate jsonRabbitTemplate;

    /**
     * 发送 消息对象 json
     *
     * @param data
     * @param messageProperties
     */
    public void sendObject(Object data, MessageProperties messageProperties) {
        Message message = jsonRabbitTemplate.getMessageConverter().toMessage(data, messageProperties);
        jsonRabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY_NAME, message);
    }

    /**
     * 发送 消息文本
     *
     * @param data
     * @param messageProperties
     */
    public void sendText(String data, MessageProperties messageProperties) {
        Message message = defaultRabbitTemplate.getMessageConverter().toMessage(data, messageProperties);
        defaultRabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY_NAME, message);
    }
}

3.2.2.消费者

MessageListener.java
@Component
@Slf4j
public class MessageListener {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private ObjectMapper objectMapper;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "direct.queue.d",
                    durable = "false"),
            exchange = @Exchange(value = "direct.exchange.a",
                    durable = "true",
                    type = ExchangeTypes.DIRECT,
                    ignoreDeclarationExceptions = "true"),
            key = "direct.routingKey.a"
    )
    )
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {
        String contentType = message.getMessageProperties().getContentType();
        String bodyText = null;
        System.out.println(contentType);
        switch (contentType) {
            //字符串
            case MessageProperties.CONTENT_TYPE_TEXT_PLAIN:
                bodyText = (String) rabbitTemplate.getMessageConverter().fromMessage(message);
                break;
            //json对象
            case MessageProperties.CONTENT_TYPE_JSON:
                User user = objectMapper.readValue(message.getBody(), User.class);
                bodyText = user.toString();
                break;
        }
        log.info("消费端Payload: " + bodyText);
    }  
}

生产者发送对象消息时,咱们使用Jackson2JsonMessageConverter,并用其toMessage方法封装。可是在消费者接收对象消息时,咱们却没有用Jackson2JsonMessageConverter的fromMessage方法,而是使用ObjectMapper来反序列化Json对象。是由于rabbitmq在发送Jackson2JsonMessageConverter的序列化对象时,会在包含类的包名信息,消费者在使用fromMessage反序列化时,必须建立一个和生产者中包名等如出一辙的类。明显不太现实。

3.3.发布确认(生产者)

3.3.1.ConfirmCallback

ConfirmCallback接口用于实现消息发送到RabbitMQ交换器后接收ack回调。

  • 投递对象:exchange
  • 回调触发:不管成功或失败,都会触发回调。
  • 投递成功:ack=true
  • 投递失败:ack=false

使用方式在于:

  • 设置 publisher-confirm-type 为 correlated。
  • 实现RabbitTemplate.ReturnCallback 的函数式接口,并使用。
ProducerService.java
@Slf4j
@Service
public class ProducerService {
    private static final String EXCHANGE_NAME = "direct.exchange.a";
    private static final String ROUTING_KEY_NAME = "direct.routingKey.ab";

    @Resource(name = "defaultRabbitTemplate")
    private RabbitTemplate defaultRabbitTemplate;

    /**
     * ConfirmCallback
     *
     * 投递对象:exchange
     * 回调触发:不管成功或失败,都会触发回调。
     * 投递成功:ack=true
     * 投递失败:ack=false
     */
    RabbitTemplate.ConfirmCallback confirmCallback = (CorrelationData correlationData, boolean ack, String cause) -> {
        log.info("ack: " + ack);
        if (!ack) {
            log.info("投递exchange失败!....能够进行日志记录、异常处理、补偿处理等");
        } else {
            log.info("投递exchange成功!");
        }
    };

 
    /**
     * 发送 消息文本
     *
     * @param data
     * @param messageProperties
     */
    public void sendText(String data, MessageProperties messageProperties) {
        Message message = defaultRabbitTemplate.getMessageConverter().toMessage(data, messageProperties);
        
        //confirmCallback
        defaultRabbitTemplate.setConfirmCallback(confirmCallback);

        defaultRabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY_NAME, message);
    }
}

配置文件须要设置:

spring.rabbitmq.publisher-confirm-type = correlated

3.3.2.ReturnCallback

ReturnCallback接口用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调。

  • 投递对象:queue
  • 回调触发:只有投递失败,才会触发回调。

使用方式在于:

  • 设置 publisher-returns 为 true。
  • 设置 mandatory 为 true。
  • 实现RabbitTemplate.ReturnCallback的函数式接口,并使用。
ProducerService.java
@Slf4j
@Service
public class ProducerService {
    private static final String EXCHANGE_NAME = "direct.exchange.a";
    private static final String ROUTING_KEY_NAME = "direct.routingKey.ab";

    @Resource(name = "defaultRabbitTemplate")
    private RabbitTemplate defaultRabbitTemplate;

    /**
     * ReturnCallback
     *
     * 投递对象:queue
     * 回调触发:只有投递失败,才会触发回调。
     */
    RabbitTemplate.ReturnCallback returnCallback = (Message message, int replyCode, String replyText,
                                                    String exchange, String routingKey) -> {
        log.info("投递到queue失败! exchange: " + exchange + ", routingKey: "
                + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
    };

    /**
     * 发送 消息文本
     *
     * @param data
     * @param messageProperties
     */
    public void sendText(String data, MessageProperties messageProperties) {
        Message message = defaultRabbitTemplate.getMessageConverter().toMessage(data, messageProperties);
        //returnCallback
        defaultRabbitTemplate.setMandatory(true);
        defaultRabbitTemplate.setReturnCallback(returnCallback);

        defaultRabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTING_KEY_NAME, message);
    }
}

须要在配置文件中配置:

spring.rabbitmq.publisher-returns = true

3.4.接收确认(消费者)

上一节讲解的是,如何在生产者发布消息时,确认消息发布到rabbitmq的交换机和队列中。那么这一节讲解的是,如何保障消费者能彻底“消费”了消息。

一般状况下,rabbitmq做为消息中间件,它把message推送给消费者就完成了它的使命,该message就自动被“签收”了。而消费者在接收到message后,再去实现关于该message的业务逻辑。可若是在实现该业务逻辑过程当中发生了错误,须要从新执行,那就难办了。由于message一旦被“签收”后,就从rabbitmq中被删除,不可能从新再发送。

若是消费者能手动控制message的“签收”操做,只有当关于message的业务逻辑执行完成后再“签收”,message再从rabbitmq中删除,不然可让message重发就行了。这一节就讲这个。

3.4.1.AcknowledgeMode

Acknowledge意思是“确认”,消息经过 ACK 确认是否被正确接收,每一个 Message 都要被确认(acknowledged),能够手动去 ACK 或自动 ACK。

使用手动应答消息,有一点须要特别注意,那就是不能忘记应答消息,由于对于RabbitMQ来讲处理消息没有超时,只要不该答消息,他就会认为仍在正常处理消息,致使消息队列出现阻塞,影响业务执行。若是不想处理,能够reject丢弃该消息。

消息确认模式有:

  • AcknowledgeMode.NONE:自动确认
  • AcknowledgeMode.AUTO:根据状况确认
  • AcknowledgeMode.MANUAL:手动确认

默认是自动确认,能够经过RabbitListenerContainerFactory 中进行开启手动ack,或者中配置文件中开启:

spring.rabbitmq.listener.simple.acknowledge-mode = manual
MessageListener.java
@Component
@Slf4j
public class MessageListener {
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Autowired
    private ObjectMapper objectMapper;

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "direct.queue.d",
                    durable = "false"),
            exchange = @Exchange(value = "direct.exchange.a",
                    durable = "true",
                    type = ExchangeTypes.DIRECT,
                    ignoreDeclarationExceptions = "true"),
            key = "direct.routingKey.a"
    )
    )
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {
        String contentType = message.getMessageProperties().getContentType();
        String bodyText = null;
        System.out.println(contentType);
        switch (contentType) {
            //字符串
            case MessageProperties.CONTENT_TYPE_TEXT_PLAIN:
                bodyText = (String) rabbitTemplate.getMessageConverter().fromMessage(message);
                break;
            //json对象
            case MessageProperties.CONTENT_TYPE_JSON:
                User user = objectMapper.readValue(message.getBody(), User.class);
                bodyText = user.toString();
                break;
        }
        log.info("消费端Payload: " + bodyText);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

}

3.4.2.Ack/Nack/Reject

设置为手动确认后,有3种确认操做:

  • Ack:确认收到消息,而后消息从队列中删除。
  • Nack:确认没有收到消息,消息从新回到队列中发送。
  • Reject:拒绝该消息,直接丢弃该消息,不会回到队列中。

如示例代码中的 basicAck 方法,须要注意的是,要传递两个参数:

  • deliveryTag(惟一标识 ID):当一个消费者向 RabbitMQ 注册后,会创建起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它表明了 RabbitMQ 向该 Channel 投递的这条消息的惟一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel
  • multiple:为了减小网络流量,手动确承认以被批处理,当该参数为 true 时,则能够一次性确认 delivery_tag 小于等于传入值的全部消息

3.4.3.异常重试

除了上述手动确认的方式,还有一种不太经常使用的方式,能够实现重复发送消息。在开启异常重试的前提下,在消费者代码中抛出异常,会自动重发消息。

application.properties

spring.rabbitmq.listener.simple.retry.enabled=true 是否开启消费者重试
spring.rabbitmq.listener.simple.retry.max-attempts=5  最大重试次数
spring.rabbitmq.listener.simple.retry.initial-interval=5000 重试间隔时间(单位毫秒)
spring.rabbitmq.listener.simple.default-requeue-rejected=false 重试次数超过上面的设置以后是否丢弃
MessageListener.java
@RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "direct.queue.d",
                    durable = "false"),
            exchange = @Exchange(value = "direct.exchange.a",
                    durable = "true",
                    type = ExchangeTypes.DIRECT,
                    ignoreDeclarationExceptions = "true"),
            key = "direct.routingKey.a"
    )
    )
    @RabbitHandler
    public void onMessage(Message message, Channel channel) throws Exception {
        String contentType = message.getMessageProperties().getContentType();
        String bodyText = null;
        System.out.println(contentType);
        switch (contentType) {
            //字符串
            case MessageProperties.CONTENT_TYPE_TEXT_PLAIN:
                bodyText = (String) rabbitTemplate.getMessageConverter().fromMessage(message);
                break;
            //json对象
            case MessageProperties.CONTENT_TYPE_JSON:
                User user = objectMapper.readValue(message.getBody(), User.class);
                bodyText = user.toString();
                break;
        }
        log.info("消费端Payload: " + bodyText);
        throw new RuntimeException("重试啦");
    }

3.5.消费模式

在RabbitMQ中消费者有2种方式获取队列中的消息:

  • push:basic.consume命令订阅某一个队列中的消息,channel会自动在处理完上一条消息以后,接收下一条消息。(同一个channel消息处理是串行的)。除非关闭channel或者取消订阅,不然客户端将会一直接收队列的消息。
  • pull:basic.get命令主动获取队列中的消息,可是绝对不能够经过循环调用basic.get来代替basic.consume,这是由于basic.get RabbitMQ在实际执行的时候,是首先consume某一个队列,而后检索第一条消息,而后再取消订阅。若是是高吞吐率的消费者,最好仍是建议使用basic.consume。

对比来讲,若是有持续消费的需求,建议用push的方式,经过监听器来订阅。若是只是特定时刻须要从队列中,一次性取些数据,能够用pull方式。

4.名词概念

4.1.channel

咱们知道不管是生产者仍是消费者,都须要和 RabbitMQ Broker 创建链接,这个链接就是一条 TCP 链接,也就是 Connection。一旦 TCP 链接创建起来,客户端紧接着能够建立一个 AMQP 信道(Channel),每一个信道都会被指派一个惟一的 ID。

信道是创建在 Connection 之上的虚拟链接,RabbitMQ 处理的每条 AMQP 指令都是经过信道完成的。

咱们彻底可使用 Connection 就能完成信道的工做,为何还要引入信道呢?试想这样一个场景,一个应用程序中有不少个线程须要从 RabbitMQ 中消费消息,或者生产消息,那么必然须要创建不少个 Connection,也就是多个 TCP 链接。然而对于操做系统而言,创建和销毁 TCP 链接是很是昂贵的开销,若是遇到使用高峰,性能瓶颈也随之显现。

RabbitMQ 采用相似 NIO(Non-blocking I/O)的作法,选择 TCP 链接复用,不只能够减小性能开销,同时也便于管理。

每一个线程把持一个信道,因此信道复用了 Connection 的 TCP 链接。同时 RabbitMQ 能够确保每一个线程的私密性,就像拥有独立的链接同样。当每一个信道的流量不是很大时,复用单一的 Connection 能够在产生性能瓶颈的状况下有效地节省 TCP 链接资源。可是信道自己的流量很大时,这时候多个信道复用一个 Connection 就会产生性能瓶颈,进而使总体的流量被限制了。此时就须要开辟多个 Connection,将这些信道均摊到这些 Connection 中,至于这些相关的调优策略须要根据业务自身的实际状况进行调节。

信道在 AMQP 中是一个很重要的概念,大多数操做都是在信道这个层面展开的。好比 channel.exchangeDeclare、channel.queueDeclare、channel.basicPublish、channel.basicConsume 等方法。RabbitMQ 相关的 API 与 AMQP 紧密相连,好比 channel.basicPublish 对应 AMQP 的 Basic.Publish 命令。

4.2.QoS

针对push方式,RabbitMQ能够设置basicQoS(Consumer Prefetch)来对consumer进行流控,从而限制未Ack的消息数量。

前提包括,消息确认模式必须是手动确认。

basicQos(int var1, boolean var2)
  • 第一个参数是限制未Ack消息的最大数量。
  • 第二个参数是布尔值,(1)为true时,说明是针对channel作的流控限制;(2)为false时,说明是针对整个消费者作的流控限制。