7、消费消息与性能权衡(读书笔记与我的实践)

摘要

主要介绍消费消息时的几种方式:前端

  • 平衡消息消费的可靠性与性能;
  • 死信交换器;
  • 设置自动删除队列、持久化队列、TTL等;

消费性能

使用no-ack模式 

在消费消息时,负责消费的应用程序会发送一个Basic.Consumer请求,与该请求一块儿发送的还有一个no-ack标志。当这个标志启用时,它会告诉RabbitMQ消费者在接收到消息时不会进行确认,RabbitMQ只管尽快的发送消息。java

使用no-ack标志消费消息是让RabbitMQ将消费投递给消费者的最快方式,但这也是最不可靠的方式。

若是使用no-ack,那么当有新的可用消息时,RabbitMQ将会发送该消息给消费者,而不用等待。实际上,若是有可用消息,RabbitMQ会持续向消费者发送它们,直到套接字缓冲区被填满为止。spring

目前没有找到RabbitTemplate如何开启no-ack的方法,若是有用过的朋友,请留言告诉我,谢谢。bash

消息确认模式

开启消息确认模式,每次接收到消息后,都要向RabbitMQ返回一个Basic.Ack。服务器

消息确认有三种确认方式:性能

  • Ack;
  • Reject;
  • Nack;

基于RabbitTemplate,下面这段代码,有对这几种确认方式的实现,在配置文件中开启手动确认模式,acknowledge-mode属性为manual(默认为自动确认):测试

spring:
  #消息队列配置
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 5000ms
    listener:
      simple:
        acknowledge-mode: manual
/**
 * 消费者监听消息队列
 */
@Component
@Slf4j
@RabbitListener(queues = "DIRECT_QUEUE")
public class DirectQueueListener {

    @RabbitHandler
    public void process(String message,
                        Channel channel,
                        @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException, InterruptedException {
        log.info("消费消息成功: {}", message);
        Thread.sleep(1000);
        switch (message) {
            case "nack":
                channel.basicNack(tag, true, false); // 第二个参数控制是否开启批量拒绝,第三个参数表示是否requeue
                break;
            case "nack-requeue":
                channel.basicNack(tag, true, true);
                break;
            case "reject":
                channel.basicReject(tag, false);
                break;
            case "reject-requeue": // 启用了requeue,若是只有一个消费者,容易形成死循环
                channel.basicReject(tag, true);
                break;
            default:
                channel.basicAck(tag, true);
                break;
        }
    }

}

channel.basicAck:当正常消费消息时,调用该方法。fetch

咱们看到除了basicAck,还有basicReject和basicNack。这两种,顾名思义,是用来拒绝消费的。ui

channel.basicReject:从协议层面上,reject是发送一个Basic.Reject响应,告知RabbitMQ没法对这条消息进行处理,当拒绝时,能够指定是否丢弃消息或使用requeue标志从新发送消息。当启用requeue时,RabbitMQ将会把这条消息从新放回到队列中。spa

不能使用basicReject一次拒绝多个消息。

channel.basicNack:Basic.Nack实现与Basic.Reject相同的行为,但添加了批量拒绝的功能。

设置multiple或requeue如图所示:

服务质量确认模式

AMQP规范要求信道要有服务质量设置,即在确认消息接收以前能够预先接收必定数量的消息。能够设置一个预取数量来实现高效的发送消息。

若是消费者应用程序在确认消息以前崩溃,在套接字关闭时,全部预取的消息将返回到队列。

若是设置了no-ack,那么预取大小将被忽略。

使用RabbitTemplate时,能够在消费者应用程序的配置文件中配置预取大小:

spring:
  #消息队列配置
  rabbitmq:
    host: 127.0.0.1
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    connection-timeout: 5000ms
    listener:
      simple:
        acknowledge-mode: manual
        prefetch: 1000

其中prefetch就是预取大小,消费者应用程序运行后,能够在RabbitMQ的控制台看到这个设置:

若是熟悉抓包软件的朋友,能够试着抓包看看:

我预先发送了2条消息到RabbitMQ,能够看到上图中最后两行是两个Ack。

有一种方式能够一次确认多个消息,Basic.Ack响应具备一个multiple属性,当把它设置为true时就能确认之前未确认的消息。

若是使用multiple,当成功的接收了一些消息,而且应用程序在回复Ack以前就发生了异常,则全部为确认的消息将返回队列。

死信交换器

RabbitMQ的死信交换器是一种能够拒绝已投递消息的可选行为,通常有三种状况的消息会进入死信队列:

  • 当拒绝了一个不从新发送的消息时,会进入死信;
  • 当消息的TTL到期时,会进入死信;
  • 当队列已满时,会进入死信;
死信与备用交换器不一样,过时或被拒绝的消息经过死信交换器进行投递,而备用交换器则路由那些没法由RabbitMQ路由的消息。

在RabbitMQ中,在声明队列时,指定死信交换器:

/**
     * 声明队列。
     * 同时指定死信队列。
     *
     * @return Queue对象。
     */
    @Bean("directQueue")
    public Queue directQueue() {
        return QueueBuilder.durable("DIRECT_QUEUE")
                .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME)
                .withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_NAME)
                .build();
    }

死信交换器还容许使用预先指定的值覆盖路由键,这样能够容许同一个交换器同时处理死信和非死信消息,但须要确保死信消息不被投递到相同的队列。设置预约义路由键的关键字是:x-dead-letter-routing-key。

测试死信队列,当消费者拒绝时,查看消息是否会进入死信队列:

控制队列

定义队列时,有多个设置能够肯定队列的行为:

  • 自动删除本身;
  • 只容许一个消费者进行消费;
  • 自动过时消息;
  • 保持有限数量的消息;
  • 将旧消息推出堆栈;

更改队列的设置,必须删除队列并从新建立它。

临时队列

也能够叫作自动删除的队列。

一旦消费者完成链接和检索消息,在断开链接时队列将被删除。

@Bean("directQueue")
    public Queue directQueue() {
        return QueueBuilder.durable("DIRECT_QUEUE").autoDelete()
                .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME)
                .withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_NAME)
                .build();
    }

队列只会在没有消费者监听的时候自行删除。

只容许单个消费者

在须要确保只有单个消费者可以消费队列中的消息时,在建立队列时设置exclusive属性,启用后在消费者断开链接后,队列也会自动删除。

@Bean("directQueue")
    public Queue directQueue() {
        return QueueBuilder.durable("DIRECT_QUEUE").exclusive()
                .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME)
                .withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_NAME)
                .build();
    }

声明exclusive的队列,只能被声明时所指定的同一个链接和信道所消费,当建立队列的信道关闭时,独占队列也将会删除。在信道关闭之间,能够屡次声明和取消exclusive队列的消费者。

自动过时队列

若是一段时间没有使用该队列,就删除它。

建立一个自动过时的队列很是简单,要作的事情就是使用x-expires参数声明一个队列。该参数以毫秒为单位设置队列的生存时间(Time To Live,TTL)。

自动过时队列有一些严格的约定:

  • 队列只有在没有消费者的状况下才会过时。若是有链接消费者,则只有发出Basic.Cancel或断开链接以后才自动删除;
  • 队列只有在TTL周期以内没有收到Basic.Get请求时才会到期。一旦一个Basic.Get请求中已经包含了一个具备过时值的队列,那么过时设置无效,队列不会被自动删除(不要使用Get);
  • 不能从新声明或更改x-expires属性;
  • RabbitMQ不保证过时删除队列这一过程的时效性;

永久队列

使用durable标志告诉RabbitMQ但愿队列配置被保存在服务器:

@Bean("directQueue")
    public Queue directQueue() {
        return QueueBuilder.durable("DIRECT_QUEUE")
                .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME)
                .withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_NAME)
                .build();
    }

消息级别的TTL

消息级别的TTL设置容许服务器对消息的最大生存时间进行限制。声明队列的同时指定死信交换器和TTL值将致使该队列中已到期的消息成为死信消息。

可使用x-message-ttl设置队列的消息TTL时间。

最大长度队列

从RabbitMQ3.1.0开始,能够在声明队列时指定最大长度。若是在队列上设置列x-max-length参数,一旦消息到达最大值,RabbitMQ会在添加新消息时删除位于队列前端的消息,若是声明队列时候,指定列死信交换器,则从队列前端删除的消息会进入死信队列。

相关文章
相关标签/搜索