在《RabbitMQ(1)-基础开发应用》中,咱们已经介绍了RabbitMQ的基础开发应用。本文基于这些基础再作一些扩展,延伸出一些高级的用法,如:死信队列、延迟队列和优先队列。不过仍是以死信队列为主,由于延迟队列是死信队列的衍生概念,并且优先队列也比较简单,因此先仍是在代码层面上,把死信队列搞透。java
咱们在使用RabbitMQ以前,须要先建立好相关的队列和交换机,而且设置一些绑定关系。由于几篇文章都是结合springboot来开发,下面就结合springboot介绍几种建立方式:spring
@RabbitListener
注解的 bindings
属性,能够简单实现相似功能。@RabbitListener(bindings = @QueueBinding( value = @Queue(value = "direct.queue.d", durable = "true"), exchange = @Exchange(value = "direct.exchange.a", durable = "true", type = ExchangeTypes.DIRECT, ignoreDeclarationExceptions = "true"), key = "direct.routingKey.a" ) )
Queue、Exchange、Binding
的实例。package pers.kerry.exercise.rabbitmq.rabbitmqproducera.config; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @description: * @date: 2020/7/12 11:26 下午 * @author: kerry */ @Configuration public class RabbitConfig { public static final String NORMAL_EXCHANGE_A="demo.direct.exchange.a"; public static final String NORMAL_ROUTING_KEY_A="demo.direct.routingKey.a"; public static final String NORMAL_QUEUE_A="demo.direct.queue.a"; /** * NORMAL 交换机 * @return */ @Bean public Exchange ExchangeA(){ return ExchangeBuilder .directExchange(NORMAL_EXCHANGE_A) .durable(true) .build(); } /** * NORMAL 队列 * @return */ @Bean public Queue QueueA(){ return QueueBuilder .durable(NORMAL_QUEUE_A) .build(); } /** * 绑定 NORMAL队列 和 NORMAL交换机 * @return */ @Bean public Binding normalBinding(){ return new Binding(NORMAL_QUEUE_A, Binding.DestinationType.QUEUE, NORMAL_EXCHANGE_A, NORMAL_ROUTING_KEY_A, null); } }
我我的推荐第三种,并且建议是在生产者端定义,消费者应该更关注消费的逻辑。可是若是用代码来建立,有一个很大的缺点,就是不能删除和修改,至少我目前还没找到办法。json
所以要结合第一种和第三种来使用,固然都用第一种也是能够的。只是开发人员,更但愿队列、交换机的建立、绑定的逻辑,都体如今代码里面,经过代码能够更好的阅读架构设计。segmentfault
死信队列
这个名字听起来很特别,但它解决的是平常开发中最多见的问题:不能正常消费的消息,该如何处理
。咱们在第一篇文章中有使用到手动Ack,对于须要nack而且无需重回队列的消息,指望有统一的异常处理;包括有些消息是有时效性的,若是支付订单通常都最大支付时常,超时后就应该取消订单;等等。springboot
死信队列就是应对这些状况的,它出现的条件以下:架构
basicNack
或basicReject
,而且此时requeue 属性被设置为false。死信队列的架构以下:并发
生产者 --> 消息 --> 业务交换机 --> 业务队列 --> 消息变成死信 --> 死信交换机 -->死信队列 --> 消费者
app
如何配置死信队列呢?其实很简单,大概能够分为如下步骤:ui
注意,并非直接声明一个公共的死信队列,而后因此死信消息就本身跑到死信队列里去了。而是为每一个须要使用死信的业务队列配置一个死信交换机,这里同一个项目的死信交换机能够共用一个,而后为每一个业务队列分配一个单独的路由key。架构设计
有了死信交换机和路由key后,接下来,就像配置业务队列同样,配置死信队列,而后绑定在死信交换机上。也就是说,死信队列并非什么特殊的队列,只不过是绑定在死信交换机上的队列。死信交换机也不是什么特殊的交换机,只不过是用来接受死信的交换机,因此能够为任何类型【Direct、Fanout、Topic】。通常来讲,会为每一个业务队列分配一个独有的路由key,并对应的配置一个死信队列进行监听,也就是说,通常会为每一个重要的业务队列配置一个死信队列。
按照简介中死信队列的架构,咱们在配置文件中定义了NORMAL
的业务队列和业务交换机,以及DLX
的死信队列和死信交换机,并在业务队列中设置了死信交换机。
RabbitConfig.java
package pers.kerry.exercise.rabbitmq.rabbitmqproducera.config; import org.springframework.amqp.core.*; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * @description: * @date: 2020/7/12 11:26 下午 * @author: kerry */ @Configuration public class RabbitConfig { /** * DLX,定义参数 */ public static final String X_DEAD_LETTER_EXCHANGE="x-dead-letter-exchange"; public static final String X_DEAD_LETTER_ROUTING_KEY="x-dead-letter-routing-key"; public static final String X_MESSAGE_TTL="x-message-ttl"; public static final String X_MAX_LENGTH="x-max-length"; /** * DLX,命名 */ public static final String DEAD_LETTER_EXCHANGE_A="demo.direct.dlx.exchange.a"; public static final String DEAD_LETTER_ROUTING_KEY_A="demo.direct.dlx.routingKey.a"; public static final String DEAD_LETTER_QUEUE_A="demo.direct.dlx.queue.a"; /* * NORMAL,命名 */ public static final String NORMAL_EXCHANGE_A="demo.direct.exchange.a"; public static final String NORMAL_ROUTING_KEY_A="demo.direct.routingKey.a"; public static final String NORMAL_QUEUE_A="demo.direct.queue.a"; @Bean("jsonRabbitTemplate") public RabbitTemplate jsonRabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter()); return rabbitTemplate; } @Bean("defaultRabbitTemplate") public RabbitTemplate defaultRabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate=new RabbitTemplate(connectionFactory); return rabbitTemplate; } /** * DLX 交换机 * @return */ @Bean public Exchange dlxExchangeA(){ return ExchangeBuilder .directExchange(DEAD_LETTER_EXCHANGE_A) .durable(true) .build(); } /** * DLX 队列 * @return */ @Bean public Queue dlxQueueA(){ return QueueBuilder .durable(DEAD_LETTER_QUEUE_A) .build(); } /** * NORMAL 交换机 * @return */ @Bean public Exchange ExchangeA(){ return ExchangeBuilder .directExchange(NORMAL_EXCHANGE_A) .durable(true) .build(); } /** * NORMAL 队列 * @return */ @Bean public Queue QueueA(){ return QueueBuilder .durable(NORMAL_QUEUE_A) //设置 死信交换机 .withArgument(X_DEAD_LETTER_EXCHANGE,DEAD_LETTER_EXCHANGE_A) .withArgument(X_DEAD_LETTER_ROUTING_KEY,DEAD_LETTER_ROUTING_KEY_A) //设置 队列全部消息 存活时间8秒 .withArgument(X_MESSAGE_TTL,8000) //设置 队列最大长度 10条 .withArgument(X_MAX_LENGTH,10) .build(); } /** * 绑定 DLX队列 和 DLX交换机 * @return */ @Bean public Binding dlxBinding(){ return new Binding(DEAD_LETTER_QUEUE_A, Binding.DestinationType.QUEUE,DEAD_LETTER_EXCHANGE_A, DEAD_LETTER_ROUTING_KEY_A, null); } /** * 绑定 NORMAL队列 和 NORMAL交换机 * @return */ @Bean public Binding normalBinding(){ return new Binding(NORMAL_QUEUE_A, Binding.DestinationType.QUEUE, NORMAL_EXCHANGE_A, NORMAL_ROUTING_KEY_A, null); } }
ProducerService.java
@Slf4j @Service public class ProducerService { public void sendText(String data, MessageProperties messageProperties) { /** * 对单个消息 设置TTL */ //messageProperties.setExpiration(String.valueOf(3000)); Message message = defaultRabbitTemplate .getMessageConverter() .toMessage(data, messageProperties); defaultRabbitTemplate.convertAndSend(RabbitConfig.NORMAL_EXCHANGE_A, RabbitConfig.NORMAL_ROUTING_KEY_A, message); } }
消费者的逻辑比较简单,主要是分别监听业务队列和死信队列,这里将两个队列的消息输出日志。这里模拟的是消费者nack消息,而且不退回队列的状况。
MessageListener.java
/** * @description: * @date: 2020/7/12 11:07 下午 * @author: kerry */ @Component @Slf4j public class MessageListener { @Autowired private RabbitTemplate rabbitTemplate; @Autowired private ObjectMapper objectMapper; /** * @param message * @param channel * 监听 业务队列 * * @throws Exception */ @RabbitListener(queues = RabbitConfig.NORMAL_QUEUE_A) @RabbitHandler public void onMessage(Message message, Channel channel) throws Exception { String contentType = message.getMessageProperties().getContentType(); String bodyText = null; switch (contentType) { //字符串 case MessageProperties.CONTENT_TYPE_TEXT_PLAIN: bodyText = (String) rabbitTemplate.getMessageConverter().fromMessage(message); break; //json对象 case MessageProperties.CONTENT_TYPE_JSON: User user = objectMapper.readValue(message.getBody(), User.class); bodyText = user.toString(); break; } log.info("业务队列-拒绝消息: " + bodyText); channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false); /** * 延迟队列,被消费的消息须要重回队列 */ //channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); } /** * @param message * @param channel * 监听 死信队列 * * @throws Exception */ @RabbitListener(queues = RabbitConfig.DEAD_LETTER_QUEUE_A) @RabbitHandler public void onMessageDLX(Message message, Channel channel) throws Exception { String contentType = message.getMessageProperties().getContentType(); String bodyText = null; switch (contentType) { //字符串 case MessageProperties.CONTENT_TYPE_TEXT_PLAIN: bodyText = (String) rabbitTemplate.getMessageConverter().fromMessage(message); break; //json对象 case MessageProperties.CONTENT_TYPE_JSON: User user = objectMapper.readValue(message.getBody(), User.class); bodyText = user.toString(); break; } log.info("死信队列-接收消息: " + bodyText); channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } }
开头说过,致使消息转为死信队列的方式有3种,下面就从代码中分析这3种状况。
咱们回过头来看看消费者这边,定义业务队列的方法:
/** * NORMAL 队列 * @return */ @Bean public Queue QueueA(){ return QueueBuilder .durable(NORMAL_QUEUE_A) //设置 死信交换机 .withArgument(X_DEAD_LETTER_EXCHANGE,DEAD_LETTER_EXCHANGE_A) .withArgument(X_DEAD_LETTER_ROUTING_KEY,DEAD_LETTER_ROUTING_KEY_A) //设置 队列全部消息 存活时间8秒 .withArgument(X_MESSAGE_TTL,8000) //设置 队列最大长度 10条 .withArgument(X_MAX_LENGTH,10) .build(); }
1. nack拒绝消息,requeue=false
在死信队列的架构中,只要在业务队列中设置了死信交换机x-dead-letter-exchange
。消费者代码中,咱们在业务队列的消费过程当中nack消息,而且requeue=false便可。x-dead-letter-routing-key
能够不设置,若是不设置,默认取消息原来的路由键。
代码以下:
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,false);
channel.basicNack方法的参数以下:
2. 消息超时TTL
TTL(Time-To-Live)指消息的存活时间,咱们有两种方式设置消息的TTL:
x-message-ttl
,设置后,进入该队列的全部消息的TTL都为对应的值。messageProperties.setExpiration(String.valueOf(3000));
,就是设置消息的TTL为3秒。不过要注意的是,若是已经设置了nack的死信逻辑,TTL的死信就不生效了。道理也很简单,由于nack消息和requeue=false一块儿用,表明消息被消费了,而且消息不会重回队列,直接被丢弃或进入死信队列,又怎么会在队列中超时了呢。
3. 超过队列长度
能够设置队列长度,例如最大接收消息的数量。当消息在队列中已经达到最大数量,那么后面再来的消息,就会被直接丢进死信队列。
咱们也是中定义业务队列的代码中,有经过x-max-length
参数,设置业务队列的长度。
在我还不知道延迟队列以前,我就以为消息中间件应该具有这样的功能。在消息发布到队列后,我指望每一个消息延迟指定时间后,再被消费者获取到。例如:在支付模块中,当用户生成订单,再到支付完成订单,是有一段时间的。而咱们通常会给这个订单设置超时时间,若是超过了这段时间,订单就应该被取消,没法再支付。那么将订单做为消息,就能够利用延迟队列来实现取消订单的逻辑。
RabbitMQ并不直接支持延迟队列的功能,而是做为一个概念,你能够利用死信队列来实现“延迟队列”。利用TTL超时时间的死信方式,来实现延迟队列。
回顾一下上段中TTL的方式,咱们在业务队列中除了定义死信交换机x-dead-letter-exchange
,还能够定义队列的生存时间x-message-ttl
,或者设置消息的过时时间。而若是咱们不消费这个业务队列中的消息,那么消息在到达TTL后,就会自动转到死信队列中。若是咱们只消费死信队列中的消息,忽略掉业务队列这个“中转站”,就至关于消息在被发布后,通过指定时间延迟,在死信队列中被消费,这就造成了一个“延迟队列”。
由于延迟队列就是死信队列的一种实现,因此代码层面上能够直接参考上段中TTL的部分。
优先队列,顾名思义,拥有高优先级的队列具备高的优先权,优先级高的消息具有优先被消费的权力。所以在优先队列中有两个逻辑点:队列是优先队列
,消息有优先级
。可参考死信队列章节中,TTL部分的代码,下面对代码改动地方作一下说明:
x-max-priority
参数来设置。messageProperties.setPriority(3)
,即设置该消息的优先级为3。优先队列的使用场景是在:消息有分优先级的需求,而且并发量较大。要求并发量大,是由于若是全部消息在发布以后,立刻就被消费了,那么分优先级的必要性就不大了。