主要介绍消费消息时的几种方式:前端
在消费消息时,负责消费的应用程序会发送一个Basic.Consumer请求,与该请求一块儿发送的还有一个no-ack标志。当这个标志启用时,它会告诉RabbitMQ消费者在接收到消息时不会进行确认,RabbitMQ只管尽快的发送消息。java
使用no-ack标志消费消息是让RabbitMQ将消费投递给消费者的最快方式,但这也是最不可靠的方式。
若是使用no-ack,那么当有新的可用消息时,RabbitMQ将会发送该消息给消费者,而不用等待。实际上,若是有可用消息,RabbitMQ会持续向消费者发送它们,直到套接字缓冲区被填满为止。spring
目前没有找到RabbitTemplate如何开启no-ack的方法,若是有用过的朋友,请留言告诉我,谢谢。bash
开启消息确认模式,每次接收到消息后,都要向RabbitMQ返回一个Basic.Ack。服务器
消息确认有三种确认方式:性能
基于RabbitTemplate,下面这段代码,有对这几种确认方式的实现,在配置文件中开启手动确认模式,acknowledge-mode属性为manual(默认为自动确认):测试
spring: #消息队列配置 rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: / connection-timeout: 5000ms listener: simple: acknowledge-mode: manual
/** * 消费者监听消息队列 */ @Component @Slf4j @RabbitListener(queues = "DIRECT_QUEUE") public class DirectQueueListener { @RabbitHandler public void process(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws IOException, InterruptedException { log.info("消费消息成功: {}", message); Thread.sleep(1000); switch (message) { case "nack": channel.basicNack(tag, true, false); // 第二个参数控制是否开启批量拒绝,第三个参数表示是否requeue break; case "nack-requeue": channel.basicNack(tag, true, true); break; case "reject": channel.basicReject(tag, false); break; case "reject-requeue": // 启用了requeue,若是只有一个消费者,容易形成死循环 channel.basicReject(tag, true); break; default: channel.basicAck(tag, true); break; } } }
channel.basicAck:当正常消费消息时,调用该方法。fetch
咱们看到除了basicAck,还有basicReject和basicNack。这两种,顾名思义,是用来拒绝消费的。ui
channel.basicReject:从协议层面上,reject是发送一个Basic.Reject响应,告知RabbitMQ没法对这条消息进行处理,当拒绝时,能够指定是否丢弃消息或使用requeue标志从新发送消息。当启用requeue时,RabbitMQ将会把这条消息从新放回到队列中。spa
不能使用basicReject一次拒绝多个消息。
channel.basicNack:Basic.Nack实现与Basic.Reject相同的行为,但添加了批量拒绝的功能。
设置multiple或requeue如图所示:
AMQP规范要求信道要有服务质量设置,即在确认消息接收以前能够预先接收必定数量的消息。能够设置一个预取数量来实现高效的发送消息。
若是消费者应用程序在确认消息以前崩溃,在套接字关闭时,全部预取的消息将返回到队列。
若是设置了no-ack,那么预取大小将被忽略。
使用RabbitTemplate时,能够在消费者应用程序的配置文件中配置预取大小:
spring: #消息队列配置 rabbitmq: host: 127.0.0.1 port: 5672 username: guest password: guest virtual-host: / connection-timeout: 5000ms listener: simple: acknowledge-mode: manual prefetch: 1000
其中prefetch就是预取大小,消费者应用程序运行后,能够在RabbitMQ的控制台看到这个设置:
若是熟悉抓包软件的朋友,能够试着抓包看看:
我预先发送了2条消息到RabbitMQ,能够看到上图中最后两行是两个Ack。
有一种方式能够一次确认多个消息,Basic.Ack响应具备一个multiple属性,当把它设置为true时就能确认之前未确认的消息。
若是使用multiple,当成功的接收了一些消息,而且应用程序在回复Ack以前就发生了异常,则全部为确认的消息将返回队列。
RabbitMQ的死信交换器是一种能够拒绝已投递消息的可选行为,通常有三种状况的消息会进入死信队列:
死信与备用交换器不一样,过时或被拒绝的消息经过死信交换器进行投递,而备用交换器则路由那些没法由RabbitMQ路由的消息。
在RabbitMQ中,在声明队列时,指定死信交换器:
/** * 声明队列。 * 同时指定死信队列。 * * @return Queue对象。 */ @Bean("directQueue") public Queue directQueue() { return QueueBuilder.durable("DIRECT_QUEUE") .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME) .withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_NAME) .build(); }
死信交换器还容许使用预先指定的值覆盖路由键,这样能够容许同一个交换器同时处理死信和非死信消息,但须要确保死信消息不被投递到相同的队列。设置预约义路由键的关键字是:x-dead-letter-routing-key。
测试死信队列,当消费者拒绝时,查看消息是否会进入死信队列:
定义队列时,有多个设置能够肯定队列的行为:
更改队列的设置,必须删除队列并从新建立它。
也能够叫作自动删除的队列。
一旦消费者完成链接和检索消息,在断开链接时队列将被删除。
@Bean("directQueue") public Queue directQueue() { return QueueBuilder.durable("DIRECT_QUEUE").autoDelete() .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME) .withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_NAME) .build(); }
队列只会在没有消费者监听的时候自行删除。
在须要确保只有单个消费者可以消费队列中的消息时,在建立队列时设置exclusive属性,启用后在消费者断开链接后,队列也会自动删除。
@Bean("directQueue") public Queue directQueue() { return QueueBuilder.durable("DIRECT_QUEUE").exclusive() .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME) .withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_NAME) .build(); }
声明exclusive的队列,只能被声明时所指定的同一个链接和信道所消费,当建立队列的信道关闭时,独占队列也将会删除。在信道关闭之间,能够屡次声明和取消exclusive队列的消费者。
若是一段时间没有使用该队列,就删除它。
建立一个自动过时的队列很是简单,要作的事情就是使用x-expires参数声明一个队列。该参数以毫秒为单位设置队列的生存时间(Time To Live,TTL)。
自动过时队列有一些严格的约定:
使用durable标志告诉RabbitMQ但愿队列配置被保存在服务器:
@Bean("directQueue") public Queue directQueue() { return QueueBuilder.durable("DIRECT_QUEUE") .withArgument("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE_NAME) .withArgument("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY_NAME) .build(); }
消息级别的TTL设置容许服务器对消息的最大生存时间进行限制。声明队列的同时指定死信交换器和TTL值将致使该队列中已到期的消息成为死信消息。
可使用x-message-ttl设置队列的消息TTL时间。
从RabbitMQ3.1.0开始,能够在声明队列时指定最大长度。若是在队列上设置列x-max-length参数,一旦消息到达最大值,RabbitMQ会在添加新消息时删除位于队列前端的消息,若是声明队列时候,指定列死信交换器,则从队列前端删除的消息会进入死信队列。