前两天咱们已经介绍了两种Spring Cloud Stream对消息失败的处理策略:java
那么若是代码自己存在逻辑错误,不管重试多少次都不可能成功,也没有具体的降级业务逻辑,以前在深刻思考中讨论过,能够经过日志,或者降级逻辑记录的方式把错误消息保存下来,而后过后分析、修复Bug再从新处理。可是很显然,这样作很是原始,而且太过笨拙,处理复杂度太高。因此,本文将介绍利用中间件特性来便捷地处理该问题的方案:使用RabbitMQ的DLQ队列。git
准备一个会消费失败的例子,能够直接沿用前文的工程。也能够新建一个,而后建立以下代码的逻辑:github
@EnableBinding(TestApplication.TestTopic.class) @SpringBootApplication public class TestApplication { public static void main(String[] args) { SpringApplication.run(TestApplication.class, args); } @RestController static class TestController { @Autowired private TestTopic testTopic; /** * 消息生产接口 * * @param message * @return */ @GetMapping("/sendMessage") public String messageWithMQ(@RequestParam String message) { testTopic.output().send(MessageBuilder.withPayload(message).build()); return "ok"; } } /** * 消息消费逻辑 */ @Slf4j @Component static class TestListener { @StreamListener(TestTopic.INPUT) public void receive(String payload) { log.info("Received payload : " + payload); throw new RuntimeException("Message consumer failed!"); } } interface TestTopic { String OUTPUT = "example-topic-output"; String INPUT = "example-topic-input"; @Output(OUTPUT) MessageChannel output(); @Input(INPUT) SubscribableChannel input(); } }
内容很简单,既包含了消息的生产,也包含了消息消费。消息消费的时候主动抛出了一个异常来模拟消息的消费失败。spring
在启动应用以前,还要记得配置一下输入输出通道对应的物理目标(exchange或topic名)、并设置一下分组,好比:网络
spring.cloud.stream.bindings.example-topic-input.destination=test-topic spring.cloud.stream.bindings.example-topic-input.group=stream-exception-handler spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1 spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.auto-bind-dlq=true spring.cloud.stream.bindings.example-topic-output.destination=test-topic
这里加入了一个重要配置spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.auto-bind-dlq=true
,用来开启DLQ(死信队列)。完成了上面配置以后,启动应用并访问localhost:8080/sendMessage?message=hello
接口来发送一个消息到MQ中了,此时能够看到消费失败后抛出了异常,消息消费失败,记录了日志。此时,能够查看RabbitMQ的控制台以下:app
其中,test-topic.stream-exception-handler.dlq
队列就是test-topic.stream-exception-handler
的dlq(死信)队列,当test-topic.stream-exception-handler
队列中的消息消费时候以后,就会将这条消息原封不动的转存到dlq队列中。这样这些没有获得妥善处理的消息就经过简单的配置实现了存储,以后,咱们还能够经过简单的操做对这些消息进行从新消费。咱们只须要在控制台中点击test-topic.stream-exception-handler.dlq
队列的名字进入到详情页面以后,使用Move messages
功能,直接将这些消息移动回test-topic.stream-exception-handler
队列,这样这些消息就能从新被消费一次。ui
若是Move messages功能中是以下内容:spa
To move messages, the shovel plugin must be enabled, try: $ rabbitmq-plugins enable rabbitmq_shovel rabbitmq_shovel_management
那是因为没有安装对应的插件,只须要根据提示的命令安装就能使用该命令了。插件
先来总结一下在引入了RabbitMQ的DLQ以后,对于消息异常处理更为完整一些的基本思路:日志
在这样的总体思路中,可能还涉及一些微调,这里举几个常见例子,帮助读者进一步了解一些特殊的场景和配置使用!
场景一:有些消息在业务上存在时效性,进入死信队列以后,过一段时间再处理已经没有意义,这个时候如何过滤这些消息呢?
只须要配置一个参数便可:
spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.dlq-ttl=10000
该参数能够控制DLQ队列中消息的存活时间,当超过配置时间以后,该消息会自动的从DLQ队列中移除。
场景二:可能进入DLQ队列的消息存在各类不一样的缘由(不一样异常形成的),此时若是在作补救措施的时候,还但愿根据这些异常作不一样的处理时候,咱们如何区分这些消息进入DLQ的缘由呢?
再来看看这个参数:
spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.republish-to-dlq=true
该参数默认是false,若是设置了死信队列的时候,会将消息原封不动的发送到死信队列(也就是上面例子中的实现),此时你们能够在RabbitMQ控制台中经过Get message(s)
功能来看看队列中的消息,应该以下图所示:
这是一条原始消息。
若是咱们该配置设置为true的时候,那么该消息在进入到死信队列的时候,会在headers中加入错误信息,以下图所示:
这样,不论咱们是经过移回原通道处理仍是新增订阅处理这些消息的时候就能够以此做为依据进行分类型处理了。
关于RabbitMQ的binder中还有不少关于DLQ的配置,这里不一一介绍了,上面几个是目前笔者使用过的几个,其余一些暂时认为采用默认配置已经够用,除非还有其余定制要求,或者是存量内容,须要去适配才会去配置。读者能够查看官方文档了解更多详情!
本文示例读者能够经过查看下面仓库的中的stream-exception-handler-3
项目:
若是您对这些感兴趣,欢迎star、follow、收藏、转发给予支持!