以前咱们已经经过《Spring Cloud Stream消费失败后的处理策略(一):自动重试》一文介绍了Spring Cloud Stream默认的消息重试功能。本文将介绍RabbitMQ的binder提供的另一种重试功能:从新入队。java
准备一个会消费失败的例子,能够直接沿用前文的工程,也能够新建一个,而后建立以下代码的逻辑:git
@EnableBinding(TestApplication.TestTopic.class)
@SpringBootApplication
public class TestApplication {
public static void main(String[] args) {
SpringApplication.run(TestApplication.class, args);
}
@RestController
static class TestController {
@Autowired
private TestTopic testTopic;
/** * 消息生产接口 * * @param message * @return */
@GetMapping("/sendMessage")
public String messageWithMQ(@RequestParam String message) {
testTopic.output().send(MessageBuilder.withPayload(message).build());
return "ok";
}
}
/** * 消息消费逻辑 */
@Slf4j
@Component
static class TestListener {
private int count = 1;
@StreamListener(TestTopic.INPUT)
public void receive(String payload) {
log.info("Received payload : " + payload + ", " + count);
throw new RuntimeException("Message consumer failed!");
}
}
interface TestTopic {
String OUTPUT = "example-topic-output";
String INPUT = "example-topic-input";
@Output(OUTPUT)
MessageChannel output();
@Input(INPUT)
SubscribableChannel input();
}
}
复制代码
内容很简单,既包含了消息的生产,也包含了消息消费。消息消费的时候主动抛出了一个异常来模拟消息的消费失败。github
在启动应用以前,还要记得配置一下输入输出通道对应的物理目标(exchange或topic名)、并设置一下分组,好比:spring
spring.cloud.stream.bindings.example-topic-input.destination=test-topic
spring.cloud.stream.bindings.example-topic-input.group=stream-exception-handler
spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts=1
spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.requeue-rejected=true
spring.cloud.stream.bindings.example-topic-output.destination=test-topic
复制代码
完成了上面配置以后,启动应用并访问localhost:8080/sendMessage?message=hello
接口来发送一个消息到MQ中了,此时能够看到程序不断的抛出了消息消费异常。这是因为这里咱们多加了一个配置:spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.requeue-rejected=true
。在该配置做用之下,消息消费失败以后,并不会将该消息抛弃,而是将消息从新放入队列,因此消息的消费逻辑会被重复执行,直到这条消息消费成功为止。app
在完成了上面的这个例子以后,可能读者会有下面两个常见问题:ui
问题一:以前介绍的Spring Cloud Stream默认提供的默认功能(spring.cloud.stream.bindings.example-topic-input.consumer.max-attempts)与本文所说的重入队列实现的重试有什么区别?spa
Spring Cloud Stream默认提供的默认功能只是对处理逻辑的重试,它们的处理逻辑是由同一条消息触发的。而本文所介绍的从新入队史经过从新将消息放入队列而触发的,因此其实是收到了屡次消息而实现的重试。code
问题二:如上面的例子那样,消费一直不成功,这些不成功的消息会被不断堆积起来,如何解决这个问题?blog
对于这个问题,咱们能够联合前文介绍的DLQ队列来完善消息的异常处理。教程
咱们只须要增长以下配置,自动绑定dlq队列:
spring.cloud.stream.rabbit.bindings.example-topic-input.consumer.auto-bind-dlq=true
复制代码
而后改造一下消息处理程序,能够根据业务状况,为进入dlq队列增长一个条件,好比下面的例子:
@StreamListener(TestTopic.INPUT)
public void receive(String payload) {
log.info("Received payload : " + payload + ", " + count);
if (count == 3) {
count = 1;
throw new AmqpRejectAndDontRequeueException("tried 3 times failed, send to dlq!");
} else {
count ++;
throw new RuntimeException("Message consumer failed!");
}
}
复制代码
设定了计数器count,当count为3的时候抛出AmqpRejectAndDontRequeueException
这个特定的异常。此时,当只有当抛出这个异常的时候,才会将消息放入DLQ队列,从而不会形成严重的堆积问题。
本文示例读者能够经过查看下面仓库的中的stream-exception-handler-4
项目:
若是您对这些感兴趣,欢迎star、follow、收藏、转发给予支持!