Spring Cloud Stream在同一通道根据消息内容分发不一样的消费逻辑

  应用场景html

  有的时候,咱们对于同一通道中的消息处理,会经过判断头信息或者消息内容来作一些差别化处理,好比:可能在消息头信息中带入消息版本号,而后经过if判断来执行不一样的处理逻辑,其代码结构多是这样的:spring

  @StreamListener(value = TestTopic.INPUT)app

  public void receiveV1(String payload, @Header("version") String version) {优化

  if("1.0".equals(version)) {ui

  // Version 1.0spa

  }.net

  if("2.0".equals(version)) {日志

  // Version 2.0htm

  }接口

  }

  那么当消息处理逻辑复杂的时候,这段逻辑就会变得特别复杂。针对这个问题,在@StreamListener注解中提供了一个不错的属性condition,能够用来优化这样的处理结构。

  动手试试

  下面经过编写一个简单的例子来具体体会一下这个属性的用法:

  @EnableBinding(TestApplication.TestTopic.class)

  @SpringBootApplication

  public class TestApplication {

  public static void main(String[] args) {

  SpringApplication.run(TestApplication.class, args);

  }

  @RestController

  static class TestController {

  @Autowired

  private TestTopic testTopic;

  /**

  * 消息生产接口

  *

  * @param message

  * @return

  */

  @GetMapping("/sendMessage")

  public String messageWithMQ(@RequestParam String message) {

  testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "1.0").build());

  testTopic.output().send(MessageBuilder.withPayload(message).setHeader("version", "2.0").build());

  return "ok";

  }

  }

  /**

  * 消息消费逻辑

  */

  @Slf4j无锡妇科医院哪家好 http://wapyyk.39.net/wx/zonghe/fc96e.html/

  @Component

  static class TestListener {

  @StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='1.0'")

  public void receiveV1(String payload, @Header("version") String version) {

  log.info("Received v1 : " + payload + ", " + version);

  }

  @StreamListener(value = TestTopic.INPUT, condition = "headers['version']=='2.0'")

  public void receiveV2(String payload, @Header("version") String version) {

  log.info("Received v2 : " + payload + ", " + version);

  }

  }

  interface TestTopic {

  String OUTPUT = "example-topic-output";

  String INPUT = "example-topic-input";

  @Output(OUTPUT)

  MessageChannel output();

  @Input(INPUT)

  SubscribableChannel input();

  }

  }

  内容很简单,既包含了消息的生产,也包含了消息消费。在/sendMessage接口的定义中,发送了两条消息,一条消息的头信息中包含version=1.0,另一条消息的头信息中包含version=2.0。在消息监听类TestListener中,对TestTopic.INPUT通道定义了两个@StreamListener,这两个监听逻辑有不一样的condition,这里的表达式表示会根据消息头信息中的version值来作不一样的处理逻辑分发。

  在启动应用以前,还要记得配置一下输入输出通道对应的物理目标(exchange或topic名),好比:

  spring.cloud.stream.bindings.example-topic-input.destination=test-topic

  spring.cloud.stream.bindings.example-topic-input.group=stream-content-route

  spring.cloud.stream.bindings.example-topic-output.destination=test-topic

  完成了上面配置以后,就能够启动应用,并尝试访问localhost:8080/sendMessage?message=hello接口来发送一个消息到MQ中了。此时能够看到相似下面的日志:

  2018-12-24 15:50:33.361 INFO 17683 --- [content-route-1] c.d.stream.TestApplication$TestListener : Received v1 : hello, 1.0

  2018-12-24 15:50:33.363 INFO 17683 --- [content-route-1] c.d.stream.TestApplication$TestListener : Received v2 : hello, 2.0

  从日志中能够看到,两条带有不一样头信息的消息,分别经过不一样的监听处理逻辑输出了对应的日志打印。

相关文章
相关标签/搜索