SpringCloud学习笔记(9、SpringCloud Stream)

目录:

  • 什么是SpringCloud Stream
  • 如何使用SpringCloud Stream
  • 消息分流

什么是SpringCloud Stream:

SpringCloud Stream是一个用于构建消息驱动的微服务应用框架。它经过注入,输入、输出通道来与外界通讯;所以它很容易实现消息的中转,而且在更换消息中间件的时候不须要该代码,仅须要修改配置便可。支持的消息中间件如RabbitMQ、Kafka等等。spring

如何使用SpringCloud Stream(以RabbitMQ为例):

一、增长maven依赖app

 1 <dependency>
 2     <groupId>org.springframework.cloud</groupId>
 3     <artifactId>spring-cloud-stream</artifactId>
 4 </dependency>
 5 <dependency>
 6     <groupId>org.springframework.cloud</groupId>
 7     <artifactId>spring-cloud-stream-binder-rabbit</artifactId>
 8 </dependency>
 9 <dependency>
10     <groupId>org.springframework.cloud</groupId>
11     <artifactId>spring-cloud-stream-test-support</artifactId>
12     <scope>test</scope>
13 </dependency>

二、增长properties配置框架

 1 spring.application.name=stream
 2 server.port=7070
 3 
 4 # rabbitmq
 5 spring.rabbitmq.host=localhost
 6 spring.rabbitmq.port=5672
 7 spring.rabbitmq.username=guest
 8 spring.rabbitmq.password=guest
 9 
10 # stream
11 spring.cloud.stream.bindings.input.destination=customer
12 spring.cloud.stream.bindings.output.destination=customer

三、启动类加上本工程的消息代理类型maven

@EnableBinding({Processor.class})

@EnableBinding分为三种类型微服务

)org.springframework.cloud.stream.messaging.Processor:接收和发送消息ui

)org.springframework.cloud.stream.messaging.Source:仅支持发送消息spa

)org.springframework.cloud.stream.messaging.Sink:仅支持接收消息代理

四、加上Controller及Servicecode

 1 @RestController
 2 @AllArgsConstructor
 3 public class ProcessorController {
 4 
 5     private final ProcessorService processorService;
 6 
 7     @GetMapping("/testProcessor/{message}")
 8     public boolean testProcessor(@PathVariable("message") String message) {
 9         return processorService.send(message);
10     }
11 }
 1 @Service
 2 @AllArgsConstructor
 3 public class ProcessorService {
 4 
 5     private final Processor processor;
 6 
 7     public boolean send(String message) {
 8         return processor.output().send(MessageBuilder.withPayload(message).build());
 9     }
10 
11     public boolean subscribe(MessageHandler handler) {
12         return processor.input().subscribe(handler);
13     }
14 }

五、在任意bean下写上接收逻辑或另起一个工程(另外一个工程的mq须要配成一个哦)server

1 @StreamListener(Sink.INPUT)
2 public void receive(String message) {
3     System.err.println("receive message: " + message);
4 }

而后咱们启动项目,访问http://localhost:7070/testProcessor/hello,此时就会在控制台看到receive message: hello的字样。

消息分流(kafka特性):

1 @GetMapping("/testMessageShunt/{type}")
2 public boolean testMessageShunt(@PathVariable("type") String type) {
3     String header = "a".equalsIgnoreCase(type) ? "msg1" : "msg2";
4     return processorService.send(type, header);
5 }
 1 /**
 2  * RabbitMQ不支持消息分流
 3  */
 4 @StreamListener(value = Sink.INPUT, condition = "headers['contentType']=='mgs1'")
 5 public void receiveMessage1(@Payload Message<String> message) {
 6     System.err.println("receive message1: " + message.getPayload());
 7 }
 8 
 9 /**
10  * RabbitMQ不支持消息分流
11  */
12 @StreamListener(value = Sink.INPUT, condition = "headers['contentType']=='mgs2'")
13 public void receiveMessage2(@Payload Message<String> message) {
14     System.err.println("receive message2: " + message.getPayload());
15 }
相关文章
相关标签/搜索