【RabbitMQ】一文带你搞定RabbitMQ死信队列

本文口味:爆炒鱿鱼   预计阅读:15分钟java

1、说明

RabbitMQ是流行的开源消息队列系统,使用erlang语言开发,因为其社区活跃度高,维护更新较快,性能稳定,深得不少企业的欢心(固然,也包括我如今所在公司【手动滑稽】)。git

为了保证订单业务的消息数据不丢失,须要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中。但因为对死信队列的概念及配置不熟悉,致使曾一度陷入百度的汪洋大海,没法自拔,不少文章都看起来可行,可是实际上却并不能帮我解决实际问题。最终,在官网文档中找到了我想要的答案,经过官网文档的学习,才发现对于死信队列存在一些误解,致使配置死信队列之路困难重重。github

因而本着记录和分享的精神,将死信队列的概念和配置完整的写下来,以便帮助遇到一样问题的朋友。web

2、本文大纲

如下是本文大纲:spring

AG4T332}NEUNPUAU(A)U6.png

本文阅读前,须要对RabbitMQ有一个简单的了解,偏向实战配置讲解。数组

3、死信队列是什么

死信,在官网中对应的单词为“Dead Letter”,能够看出翻译确实很是的简单粗暴。那么死信是个什么东西呢?网络

“死信”是RabbitMQ中的一种消息机制,当你在消费消息时,若是队列里的消息出现如下状况:app

  1. 消息被否认确认,使用 channel.basicNackchannel.basicReject ,而且此时requeue 属性被设置为false
  2. 消息在队列的存活时间超过设置的TTL时间。
  3. 消息队列的消息数量已经超过最大队列长度。

那么该消息将成为“死信”。运维

“死信”消息会被RabbitMQ进行特殊处理,若是配置了死信队列信息,那么该消息将会被丢进死信队列中,若是没有配置,则该消息将会被丢弃。spring-boot

4、如何配置死信队列

这一部分将是本文的关键,如何配置死信队列呢?其实很简单,大概能够分为如下步骤:

  1. 配置业务队列,绑定到业务交换机上
  2. 为业务队列配置死信交换机和路由key
  3. 为死信交换机配置死信队列

注意,并非直接声明一个公共的死信队列,而后因此死信消息就本身跑到死信队列里去了。而是为每一个须要使用死信的业务队列配置一个死信交换机,这里同一个项目的死信交换机能够共用一个,而后为每一个业务队列分配一个单独的路由key。

有了死信交换机和路由key后,接下来,就像配置业务队列同样,配置死信队列,而后绑定在死信交换机上。也就是说,死信队列并非什么特殊的队列,只不过是绑定在死信交换机上的队列。死信交换机也不是什么特殊的交换机,只不过是用来接受死信的交换机,因此能够为任何类型【Direct、Fanout、Topic】。通常来讲,会为每一个业务队列分配一个独有的路由key,并对应的配置一个死信队列进行监听,也就是说,通常会为每一个重要的业务队列配置一个死信队列。

有了前文这些陈述后,接下来就是惊险刺激的实战环节,这里省略了RabbitMQ环境的部署和搭建环节。

先建立一个Springboot项目。而后在pom文件中添加 spring-boot-starter-amqpspring-boot-starter-web 的依赖,接下来建立一个Config类,这里是关键:

@Configuration
public class RabbitMQConfig {

    public static final String BUSINESS_EXCHANGE_NAME = "dead.letter.demo.simple.business.exchange";
    public static final String BUSINESS_QUEUEA_NAME = "dead.letter.demo.simple.business.queuea";
    public static final String BUSINESS_QUEUEB_NAME = "dead.letter.demo.simple.business.queueb";
    public static final String DEAD_LETTER_EXCHANGE = "dead.letter.demo.simple.deadletter.exchange";
    public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queuea.routingkey";
    public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "dead.letter.demo.simple.deadletter.queueb.routingkey";
    public static final String DEAD_LETTER_QUEUEA_NAME = "dead.letter.demo.simple.deadletter.queuea";
    public static final String DEAD_LETTER_QUEUEB_NAME = "dead.letter.demo.simple.deadletter.queueb";

    // 声明业务Exchange
    @Bean("businessExchange")
    public FanoutExchange businessExchange(){
        return new FanoutExchange(BUSINESS_EXCHANGE_NAME);
    }

    // 声明死信Exchange
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }

    // 声明业务队列A
    @Bean("businessQueueA")
    public Queue businessQueueA(){
        Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
//       x-dead-letter-routing-key  这里声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
        return QueueBuilder.durable(BUSINESS_QUEUEA_NAME).withArguments(args).build();
    }

    // 声明业务队列B
    @Bean("businessQueueB")
    public Queue businessQueueB(){
        Map<String, Object> args = new HashMap<>(2);
//       x-dead-letter-exchange    这里声明当前队列绑定的死信交换机
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
//       x-dead-letter-routing-key  这里声明当前队列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);
        return QueueBuilder.durable(BUSINESS_QUEUEB_NAME).withArguments(args).build();
    }

    // 声明死信队列A
    @Bean("deadLetterQueueA")
    public Queue deadLetterQueueA(){
        return new Queue(DEAD_LETTER_QUEUEA_NAME);
    }

    // 声明死信队列B
    @Bean("deadLetterQueueB")
    public Queue deadLetterQueueB(){
        return new Queue(DEAD_LETTER_QUEUEB_NAME);
    }

    // 声明业务队列A绑定关系
    @Bean
    public Binding businessBindingA(@Qualifier("businessQueueA") Queue queue,
                                    @Qualifier("businessExchange") FanoutExchange exchange){
        return BindingBuilder.bind(queue).to(exchange);
    }

    // 声明业务队列B绑定关系
    @Bean
    public Binding businessBindingB(@Qualifier("businessQueueB") Queue queue,
                                    @Qualifier("businessExchange") FanoutExchange exchange){
        return BindingBuilder.bind(queue).to(exchange);
    }

    // 声明死信队列A绑定关系
    @Bean
    public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
                                    @Qualifier("deadLetterExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
    }

    // 声明死信队列B绑定关系
    @Bean
    public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
                                      @Qualifier("deadLetterExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
    }
}

这里声明了两个Exchange,一个是业务Exchange,另外一个是死信Exchange,业务Exchange下绑定了两个业务队列,业务队列都配置了同一个死信Exchange,并分别配置了路由key,在死信Exchange下绑定了两个死信队列,设置的路由key分别为业务队列里配置的路由key。

下面是配置文件application.yml:

spring:
  rabbitmq:
    host: localhost
    password: guest
    username: guest
    listener:
      type: simple
      simple:
          default-requeue-rejected: false
          acknowledge-mode: manual

这里记得将default-requeue-rejected属性设置为false。

接下来,是业务队列的消费代码:

@Slf4j
@Component
public class BusinessMessageReceiver {

    @RabbitListener(queues = BUSINESS_QUEUEA_NAME)
    public void receiveA(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("收到业务消息A:{}", msg);
        boolean ack = true;
        Exception exception = null;
        try {
            if (msg.contains("deadletter")){
                throw new RuntimeException("dead letter exception");
            }
        } catch (Exception e){
            ack = false;
            exception = e;
        }
        if (!ack){
            log.error("消息消费发生异常,error msg:{}", exception.getMessage(), exception);
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        } else {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        }
    }

    @RabbitListener(queues = BUSINESS_QUEUEB_NAME)
    public void receiveB(Message message, Channel channel) throws IOException {
        System.out.println("收到业务消息B:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

而后配置死信队列的消费者:

@Component
public class DeadLetterMessageReceiver {


    @RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)
    public void receiveA(Message message, Channel channel) throws IOException {
        System.out.println("收到死信消息A:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    @RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)
    public void receiveB(Message message, Channel channel) throws IOException {
        System.out.println("收到死信消息B:" + new String(message.getBody()));
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

最后,为了方便测试,写一个简单的消息生产者,并经过controller层来生产消息。

@Component
public class BusinessMessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMsg(String msg){
        rabbitTemplate.convertSendAndReceive(BUSINESS_EXCHANGE_NAME, "", msg);
    }
}
@RequestMapping("rabbitmq")
@RestController
public class RabbitMQMsgController {

    @Autowired
    private BusinessMessageSender sender;

    @RequestMapping("sendmsg")
    public void sendMsg(String msg){
        sender.sendMsg(msg);
    }
}

一切准备就绪,启动!

能够从RabbitMQ的管理后台中看到一共有四个队列,除默认的Exchange外还有声明的两个Exchange。

8(%(3A_Y`_N8XX8W5XHZMWY.png

123.png

接下来,访问一下url,来测试一下:

http://localhost:8080/rabbitmq/sendmsg?msg=msg

日志:

收到业务消息A:msg
收到业务消息B:msg

表示两个Consumer都正常收到了消息。这表明正常消费的消息,ack后正常返回。而后咱们再来测试nck的消息。

http://localhost:8080/rabbitmq/sendmsg?msg=deadletter

这将会触发业务队列A的NCK,按照预期,消息被NCK后,会抛到死信队列中,所以死信队列将会出现这个消息,日志以下:

收到业务消息A:deadletter
消息消费发生异常,error msg:dead letter exception
java.lang.RuntimeException: dead letter exception
...

收到死信消息A:deadletter

能够看到,死信队列的Consumer接受到了这个消息,因此流程到此为止就打通了。

5、死信消息的变化

那么“死信”被丢到死信队列中后,会发生什么变化呢?

若是队列配置了参数 x-dead-letter-routing-key 的话,“死信”的路由key将会被替换成该参数对应的值。若是没有设置,则保留该消息原有的路由key。

举个栗子:

若是原有消息的路由key是testA,被发送到业务Exchage中,而后被投递到业务队列QueueA中,若是该队列没有配置参数x-dead-letter-routing-key,则该消息成为死信后,将保留原有的路由keytestA,若是配置了该参数,而且值设置为testB,那么该消息成为死信后,路由key将会被替换为testB,而后被抛到死信交换机中。

另外,因为被抛到了死信交换机,因此消息的Exchange Name也会被替换为死信交换机的名称。

消息的Header中,也会添加不少奇奇怪怪的字段,修改一下上面的代码,在死信队列的消费者中添加一行日志输出:

log.info("死信消息properties:{}", message.getMessageProperties());

而后从新运行一次,便可获得死信消息Header中被添加的信息:

死信消息properties:MessageProperties [headers={x-first-death-exchange=dead.letter.demo.simple.business.exchange, x-death=[{reason=rejected, count=1, exchange=dead.letter.demo.simple.business.exchange, time=Sun Jul 14 16:48:16 CST 2019, routing-keys=[], queue=dead.letter.demo.simple.business.queuea}], x-first-death-reason=rejected, x-first-death-queue=dead.letter.demo.simple.business.queuea}, correlationId=1, replyTo=amq.rabbitmq.reply-to.g2dkABZyYWJiaXRAREVTS1RPUC1DUlZGUzBOAAAPQAAAAAAB.bLbsdR1DnuRSwiKKmtdOGw==, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=dead.letter.demo.simple.deadletter.exchange, receivedRoutingKey=dead.letter.demo.simple.deadletter.queuea.routingkey, deliveryTag=1, consumerTag=amq.ctag-NSp18SUPoCNvQcoYoS2lPg, consumerQueue=dead.letter.demo.simple.deadletter.queuea]

Header中看起来有不少信息,实际上并很少,只是值比较长而已。下面就简单说明一下Header中的值:

字段名 含义
x-first-death-exchange 第一次被抛入的死信交换机的名称
x-first-death-reason 第一次成为死信的缘由,rejected:消息在从新进入队列时被队列拒绝,因为default-requeue-rejected 参数被设置为falseexpired :消息过时。maxlen : 队列内消息数量超过队列最大容量
x-first-death-queue 第一次成为死信前所在队列名称
x-death 历次被投入死信交换机的信息列表,同一个消息每次进入一个死信交换机,这个数组的信息就会被更新

6、死信队列应用场景

经过上面的信息,咱们已经知道如何使用死信队列了,那么死信队列通常在什么场景下使用呢?

通常用在较为重要的业务队列中,确保未被正确消费的消息不被丢弃,通常发生消费异常可能缘由主要有因为消息信息自己存在错误致使处理异常,处理过程当中参数校验异常,或者因网络波动致使的查询异常等等,当发生异常时,固然不能每次经过日志来获取原消息,而后让运维帮忙从新投递消息(没错,之前就是这么干的= =)。经过配置死信队列,可让未正确处理的消息暂存到另外一个队列中,待后续排查清楚问题后,编写相应的处理代码来处理死信消息,这样比手工恢复数据要好太多了。

7、总结

死信队列其实并无什么神秘的地方,不过是绑定在死信交换机上的普通队列,而死信交换机也只是一个普通的交换机,不过是用来专门处理死信的交换机。

总结一下死信消息的生命周期:

  1. 业务消息被投入业务队列
  2. 消费者消费业务队列的消息,因为处理过程当中发生异常,因而进行了nck或者reject操做
  3. 被nck或reject的消息由RabbitMQ投递到死信交换机中
  4. 死信交换机将消息投入相应的死信队列
  5. 死信队列的消费者消费死信消息

死信消息是RabbitMQ为咱们作的一层保证,其实咱们也能够不使用死信队列,而是在消息消费异常时,将消息主动投递到另外一个交换机中,当你明白了这些以后,这些Exchange和Queue想怎样配合就能怎么配合。好比从死信队列拉取消息,而后发送邮件、短信、钉钉通知来通知开发人员关注。或者将消息从新投递到一个队列而后设置过时时间,来进行延时消费。

本篇文章中的demo项目已上传至github,有须要的朋友能够自行下载查阅​。https://github.com/MFrank2016/dead-letter-demo​

若是本文对你有帮助,记得点个赞,也但愿能分享给更多的朋友。也欢迎关注个人公众号进行留言交流。

TIM图片20190714173105.png

相关文章
相关标签/搜索