在实际开发过程当中,服务与服务之间通讯常常会使用到消息中间件,消息中间件解决了应用解耦、异步处理、流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。java
不一样中间件内部实现方式是不同的,这些中间件的差别性致使咱们实际项目开发给咱们形成了必定的困扰,好比项目中间件为 Kafka,若是咱们要替换为 RabbitMQ,这无疑就是一个灾难性的工做,一大堆东西都要重作,由于它跟咱们系统的耦合性很是高。这时咱们可使用 Spring Cloud Stream 来整合咱们的消息中间件,下降系统和中间件的耦合性。spring
假设公司有几个不一样的系统,各系统在某些业务有联动关系,好比 A 系统完成了某些操做,须要触发 B 系统及 C 系统,可是各个系统之间产生了耦合。针对这种场景,用消息中间件就能够完成解耦,当 A 系统完成操做时将数据放进消息队列,B 和 C 系统去订阅消息就能够了,这样各系统只要约定好消息的格式就能够了。shell
传统模式:apache
中间件模式:编程
好比用户在电商网站下单,下单完成后会给用户推送短信或邮件,发短信和邮件的过程就能够异步完成。由于下单付款才是核心业务,发邮件和短信并不属于核心功能,且可能耗时较长,因此针对这种业务场景能够选择先放到消息队列中,由其余服务来异步处理。api
传统模式:服务器
中间件模式:架构
好比秒杀活动,一会儿进来好多请求,有的服务可能承受不住瞬时高并发而崩溃,针对这种场景,在中间加一层消息队列,把请求先入队列,而后再把队列中的请求平滑的推送给服务,或者让服务去队列拉取。并发
传统模式:app
中间件模式:
对于小型项目来讲,咱们一般对日志的处理没有那么多的要求,可是当用户量,数据量达到必定的峰值以后,问题就会随之而来。好比:
等不少其余的问题,这样咱们就须要借助消息队列进行业务的上解耦,数据上更好的传输。
Kafka 最开始就是专门为了处理日志产生的。
消息队列,是分布式系统中重要的组件,其通用的使用场景能够简单地描述为:当不须要当即得到结果,可是并发量又须要进行控制的时候,差很少就是须要使用消息队列的时候。在项目中,将一些无需即时返回且耗时的操做提取出来,进行了异步处理,而这种异步处理的方式大大的节省了服务器的请求响应时间,从而提升了系统的吞吐量。
当遇到上面几种状况的时候,就要考虑用消息队列了。若是你碰巧使用的是 RabbitMQ 或者 Kafka ,并且一样也在使用 Spring Cloud,那你能够考虑下用 Spring Cloud Stream。
Spring Cloud Stream 是用于构建消息驱动微服务应用程序的框架。该框架提供了一个灵活的编程模型,该模型创建在已经熟悉 Spring 习惯用法的基础上,它提供了来自多家供应商的中间件的合理配置,包括 publish-subscribe,消息分组和消息分区处理的支持。
Spring Cloud Stream 解决了开发人员无感知的使用消息中间件的问题,由于 Stream 对消息中间件的进一步封装,能够作到代码层面对中间件的无感知,甚至于动态的切换中间件,使得微服务开发的高度解耦,服务能够关注更多本身的业务流程。
组成 | 说明 |
---|---|
Middleware | 中间件,支持 RabbitMQ 和 Kafka。 |
Binder | 目标绑定器,目标指的是 Kafka 仍是 RabbitMQ。绑定器就是封装了目标中间件的包。若是操做的是 Kafka 就使用 spring-cloud-stream-binder-kafka ,若是操做的是 RabbitMQ 就使用 spring-cloud-stream-binder-rabbit 。 |
@Input | 注解标识输入通道,接收(消息消费者)的消息将经过该通道进入应用程序。 |
@Output | 注解标识输出通道,发布(消息生产者)的消息将经过该通道离开应用程序。 |
@StreamListener | 监听队列,消费者的队列的消息接收。 |
@EnableBinding | 注解标识绑定,将信道 channel 和交换机 exchange 绑定在一块儿。 |
经过定义绑定器做为中间层,实现了应用程序与消息中间件细节之间的隔离。经过向应用程序暴露统一的 Channel 通道,使得应用程序不须要再考虑各类不一样的消息中间件的实现。当须要升级消息中间件,或者是更换其余消息中间件产品时,咱们须要作的就是更换对应的 Binder
绑定器而不须要修改任何应用逻辑。
该模型图中有以下几个核心概念:
Source
:当须要发送消息时,咱们就须要经过 Source.java
,它会把咱们所要发送的消息进行序列化(默认转换成 JSON 格式字符串),而后将这些数据发送到 Channel 中;Sink
:当咱们须要监听消息时就须要经过 Sink.java
,它负责从消息通道中获取消息,并将消息反序列化成消息对象,而后交给具体的消息监听处理;Channel
:一般咱们向消息中间件发送消息或者监听消息时须要指定主题(Topic)和消息队列名称,一旦咱们须要变动主题的时候就须要修改消息发送或消息监听的代码。经过 Channel
对象,咱们的业务代码只须要对应 Channel
就能够了,具体这个 Channel 对应的是哪一个主题,能够在配置文件中来指定,这样当主题变动的时候咱们就不用对代码作任何修改,从而实现了与具体消息中间件的解耦;Binder
:经过不一样的 Binder
能够实现与不一样的消息中间件整合,Binder
提供统一的消息收发接口,从而使得咱们能够根据实际须要部署不一样的消息中间件,或者根据实际生产中所部署的消息中间件来调整咱们的配置。
stream-demo
聚合工程。SpringBoot 2.2.4.RELEASE
、Spring Cloud Hoxton.SR1
。
RabbitMQ
:消息队列eureka-server
:注册中心eureka-server02
:注册中心
点击连接观看:Stream 入门案例视频(获取更多请关注公众号「哈喽沃德先生」)
在 stream-demo
项目下建立 stream-producer
子项目。
要使用 RabbitMQ 绑定器,能够经过使用如下 Maven 坐标将其添加到 Spring Cloud Stream 应用程序中:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency>
或者使用 Spring Cloud Stream RabbitMQ Starter:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
完整依赖以下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>stream-producer</artifactId> <version>1.0-SNAPSHOT</version> <!-- 继承父依赖 --> <parent> <groupId>com.example</groupId> <artifactId>stream-demo</artifactId> <version>1.0-SNAPSHOT</version> </parent> <!-- 项目依赖 --> <dependencies> <!-- netflix eureka client 依赖 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <!-- spring cloud stream binder rabbit 绑定器依赖 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency> <!-- spring boot test 依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> </dependencies> </project>
配置 RabbitMQ 消息队列和 Stream 消息发送与接收的通道。
server: port: 8001 # 端口 spring: application: name: stream-producer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址 cloud: stream: bindings: # 消息发送通道 # 与 org.springframework.cloud.stream.messaging.Source 中的 @Output("output") 注解的 value 相同 output: destination: stream.message # 绑定的交换机名称 # 配置 Eureka Server 注册中心 eureka: instance: prefer-ip-address: true # 是否使用 ip 地址注册 instance-id: ${spring.cloud.client.ip-address}:${server.port} # ip:port client: service-url: # 设置服务注册中心地址 defaultZone: http://localhost:8761/eureka/,http://localhost:8762/eureka/
MessageProducer.java
package com.example.producer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; /** * 消息生产者 */ @Component @EnableBinding(Source.class) public class MessageProducer { @Autowired private Source source; /** * 发送消息 * * @param message */ public void send(String message) { source.output().send(MessageBuilder.withPayload(message).build()); } }
StreamProducerApplication.java
package com.example; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class StreamProducerApplication { public static void main(String[] args) { SpringApplication.run(StreamProducerApplication.class); } }
在 stream-demo
项目下建立 stream-consumer
子项目。
要使用 RabbitMQ 绑定器,能够经过使用如下 Maven 坐标将其添加到 Spring Cloud Stream 应用程序中:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency>
或者使用 Spring Cloud Stream RabbitMQ Starter:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
完整依赖以下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.example</groupId> <artifactId>stream-consumer</artifactId> <version>1.0-SNAPSHOT</version> <!-- 继承父依赖 --> <parent> <groupId>com.example</groupId> <artifactId>stream-demo</artifactId> <version>1.0-SNAPSHOT</version> </parent> <!-- 项目依赖 --> <dependencies> <!-- netflix eureka client 依赖 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <!-- spring cloud stream binder rabbit 绑定器依赖 --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency> <!-- spring boot test 依赖 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> <exclusions> <exclusion> <groupId>org.junit.vintage</groupId> <artifactId>junit-vintage-engine</artifactId> </exclusion> </exclusions> </dependency> </dependencies> </project>
配置 RabbitMQ 消息队列和 Stream 消息发送与接收的通道。
server: port: 8002 # 端口 spring: application: name: stream-consumer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址 cloud: stream: bindings: # 消息接收通道 # 与 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 注解的 value 相同 input: destination: stream.message # 绑定的交换机名称 # 配置 Eureka Server 注册中心 eureka: instance: prefer-ip-address: true # 是否使用 ip 地址注册 instance-id: ${spring.cloud.client.ip-address}:${server.port} # ip:port client: service-url: # 设置服务注册中心地址 defaultZone: http://localhost:8761/eureka/,http://localhost:8762/eureka/
MessageConsumer.java
package com.example.consumer; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.messaging.Sink; import org.springframework.stereotype.Component; /** * 消息消费者 */ @Component @EnableBinding(Sink.class) public class MessageConsumer { /** * 接收消息 * * @param message */ @StreamListener(Sink.INPUT) public void receive(String message) { System.out.println("message = " + message); } }
StreamConsumerApplication.java
package com.example; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; @SpringBootApplication public class StreamConsumerApplication { public static void main(String[] args) { SpringApplication.run(StreamConsumerApplication.class); } }
MessageProducerTest.java
package com.example; import com.example.producer.MessageProducer; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest(classes = {StreamProducerApplication.class}) public class MessageProducerTest { @Autowired private MessageProducer messageProducer; @Test public void testSend() { messageProducer.send("hello spring cloud stream"); } }
启动消息消费者,运行单元测试,消息消费者控制台打印结果以下:
message = hello spring cloud stream
RabbitMQ 界面以下:
参考源码 Source.java
和 Sink.java
建立自定义消息通道。
自定义消息发送通道 MySource.java
package com.example.channel; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; /** * 自定义消息发送通道 */ public interface MySource { String MY_OUTPUT = "my_output"; @Output(MY_OUTPUT) MessageChannel myOutput(); }
自定义消息接收通道 MySink.java
package com.example.channel; import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; /** * 自定义消息接收通道 */ public interface MySink { String MY_INPUT = "my_input"; @Input(MY_INPUT) SubscribableChannel myInput(); }
消息生产者。
server: port: 8001 # 端口 spring: application: name: stream-producer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址 cloud: stream: bindings: # 消息发送通道 # 与 org.springframework.cloud.stream.messaging.Source 中的 @Output("output") 注解的 value 相同 output: destination: stream.message # 绑定的交换机名称 my_output: destination: my.message # 绑定的交换机名称
消息消费者。
server: port: 8002 # 端口 spring: application: name: stream-consumer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址 cloud: stream: bindings: # 消息接收通道 # 与 org.springframework.cloud.stream.messaging.Sink 中的 @Input("input") 注解的 value 相同 input: destination: stream.message # 绑定的交换机名称 my_input: destination: my.message # 绑定的交换机名称
消息生产者 MyMessageProducer.java
。
package com.example.producer; import com.example.channel.MySource; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; /** * 消息生产者 */ @Component @EnableBinding(MySource.class) public class MyMessageProducer { @Autowired private MySource mySource; /** * 发送消息 * * @param message */ public void send(String message) { mySource.myOutput().send(MessageBuilder.withPayload(message).build()); } }
消息消费者 MyMessageConsumer.java
。
package com.example.consumer; import com.example.channel.MySink; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Component; /** * 消息消费者 */ @Component @EnableBinding(MySink.class) public class MyMessageConsumer { /** * 接收消息 * * @param message */ @StreamListener(MySink.MY_INPUT) public void receive(String message) { System.out.println("message = " + message); } }
MessageProducerTest.java
package com.example; import com.example.producer.MyMessageProducer; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest(classes = {StreamProducerApplication.class}) public class MessageProducerTest { @Autowired private MyMessageProducer myMessageProducer; @Test public void testMySend() { myMessageProducer.send("hello spring cloud stream"); } }
启动消息消费者,运行单元测试,消息消费者控制台打印结果以下:
message = hello spring cloud stream
RabbitMQ 界面以下:
Spring Cloud 微服务开发之因此简单,除了官方作了许多完全的封装以外还有一个优势就是约定大于配置。开发人员仅需规定应用中不符约定的部分,在没有规定配置的地方采用默认配置,以力求最简配置为核心思想。
简单理解就是:Spring 遵循了推荐默认配置的思想,当存在特殊需求时候,自定义配置便可不然无需配置。
在 Spring Cloud Stream 中,@Output("output")
和 @Input("input")
注解的 value
默认即为绑定的交换机名称。因此自定义消息通道的案例咱们就能够重构为如下方式。
参考源码 Source.java
和 Sink.java
建立自定义消息通道。
自定义消息发送通道 MySource02.java
package com.example.channel; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; /** * 自定义消息发送通道 */ public interface MySource02 { String MY_OUTPUT = "default.message"; @Output(MY_OUTPUT) MessageChannel myOutput(); }
自定义消息接收通道 MySink02.java
package com.example.channel; import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; /** * 自定义消息接收通道 */ public interface MySink02 { String MY_INPUT = "default.message"; @Input(MY_INPUT) SubscribableChannel myInput(); }
消息生产者。
server: port: 8001 # 端口 spring: application: name: stream-producer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址
消息消费者。
server: port: 8002 # 端口 spring: application: name: stream-consumer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址
消息生产者 MyMessageProducer02.java
。
package com.example.producer; import com.example.channel.MySource02; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; /** * 消息生产者 */ @Component @EnableBinding(MySource02.class) public class MyMessageProducer02 { @Autowired private MySource02 mySource02; /** * 发送消息 * * @param message */ public void send(String message) { mySource02.myOutput().send(MessageBuilder.withPayload(message).build()); } }
消息消费者 MyMessageConsumer02.java
。
package com.example.consumer; import com.example.channel.MySink02; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Component; /** * 消息消费者 */ @Component @EnableBinding(MySink02.class) public class MyMessageConsumer02 { /** * 接收消息 * * @param message */ @StreamListener(MySink02.MY_INPUT) public void receive(String message) { System.out.println("message = " + message); } }
MessageProducerTest.java
package com.example; import com.example.producer.MyMessageProducer02; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest(classes = {StreamProducerApplication.class}) public class MessageProducerTest { @Autowired private MyMessageProducer02 myMessageProducer02; @Test public void testMySend02() { myMessageProducer02.send("约定大于配置"); } }
启动消息消费者,运行单元测试,消息消费者控制台打印结果以下:
message = 约定大于配置
RabbitMQ 界面以下:
一个消息驱动微服务应用能够既是消息生产者又是消息消费者。接下来模拟一个短信邮件发送的消息处理过程:
source.message
交换机;source.message
交换机接收原始消息,通过处理分别发送至 sms.message
和 email.message
交换机;sms.message
和 email.message
交换机接收处理后的消息并发送短信和邮件。
发送原始消息,接收处理后的消息并发送短信和邮件的消息驱动微服务应用。
package com.example.channel; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; /** * 自定义消息通道 */ public interface MyProcessor { String SOURCE_MESSAGE = "source.message"; String SMS_MESSAGE = "sms.message"; String EMAIL_MESSAGE = "email.message"; @Output(SOURCE_MESSAGE) MessageChannel sourceOutput(); @Input(SMS_MESSAGE) SubscribableChannel smsInput(); @Input(EMAIL_MESSAGE) SubscribableChannel emailInput(); }
接收原始消息,通过处理分别发送短信和邮箱的消息驱动微服务应用。
package com.example.channel; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; /** * 自定义消息通道 */ public interface MyProcessor { String SOURCE_MESSAGE = "source.message"; String SMS_MESSAGE = "sms.message"; String EMAIL_MESSAGE = "email.message"; @Input(SOURCE_MESSAGE) MessageChannel sourceOutput(); @Output(SMS_MESSAGE) SubscribableChannel smsOutput(); @Output(EMAIL_MESSAGE) SubscribableChannel emailOutput(); }
约定大于配置,配置文件只修改端口和应用名称便可,其余配置一致。
spring: application: name: stream-producer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址
spring: application: name: stream-consumer # 应用名称 rabbitmq: host: 192.168.10.101 # 服务器 IP port: 5672 # 服务器端口 username: guest # 用户名 password: guest # 密码 virtual-host: / # 虚拟主机地址
发送原始消息 10086|10086@email.com
至 source.message
交换机。
package com.example.producer; import com.example.channel.MyProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; /** * 消息生产者 */ @Component @EnableBinding(MyProcessor.class) public class SourceMessageProducer { private Logger logger = LoggerFactory.getLogger(SourceMessageProducer.class); @Autowired private MyProcessor myProcessor; /** * 发送原始消息 * * @param sourceMessage */ public void send(String sourceMessage) { logger.info("原始消息发送成功,原始消息为:{}", sourceMessage); myProcessor.sourceOutput().send(MessageBuilder.withPayload(sourceMessage).build()); } }
接收处理后的消息并发送短信和邮件。
package com.example.consumer; import com.example.channel.MyProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Component; /** * 消息消费者 */ @Component @EnableBinding(MyProcessor.class) public class SmsAndEmailMessageConsumer { private Logger logger = LoggerFactory.getLogger(SmsAndEmailMessageConsumer.class); /** * 接收消息 电话号码 * * @param phoneNum */ @StreamListener(MyProcessor.SMS_MESSAGE) public void receiveSms(String phoneNum) { logger.info("电话号码为:{},调用短信发送服务,发送短信...", phoneNum); } /** * 接收消息 邮箱地址 * * @param emailAddress */ @StreamListener(MyProcessor.EMAIL_MESSAGE) public void receiveEmail(String emailAddress) { logger.info("邮箱地址为:{},调用邮件发送服务,发送邮件...", emailAddress); } }
接收原始消息 10086|10086@email.com
处理后并发送至 sms.message
和 email.message
交换机。
package com.example.consumer; import com.example.channel.MyProcessor; import com.example.producer.SmsAndEmailMessageProducer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Component; /** * 消息消费者 */ @Component @EnableBinding(MyProcessor.class) public class SourceMessageConsumer { private Logger logger = LoggerFactory.getLogger(SourceMessageConsumer.class); @Autowired private SmsAndEmailMessageProducer smsAndEmailMessageProducer; /** * 接收原始消息,处理后并发送 * * @param sourceMessage */ @StreamListener(MyProcessor.SOURCE_MESSAGE) public void receive(String sourceMessage) { logger.info("原始消息接收成功,原始消息为:{}", sourceMessage); // 发送消息 电话号码 smsAndEmailMessageProducer.sendSms(sourceMessage.split("\\|")[0]); // 发送消息 邮箱地址 smsAndEmailMessageProducer.sendEmail(sourceMessage.split("\\|")[1]); } }
发送电话号码 10086
和邮箱地址 10086@email.com
至 sms.message
和 email.message
交换机。
package com.example.producer; import com.example.channel.MyProcessor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; /** * 消息生产者 */ @Component @EnableBinding(MyProcessor.class) public class SmsAndEmailMessageProducer { private Logger logger = LoggerFactory.getLogger(SmsAndEmailMessageProducer.class); @Autowired private MyProcessor myProcessor; /** * 发送消息 电话号码 * * @param smsMessage */ public void sendSms(String smsMessage) { logger.info("电话号码消息发送成功,消息为:{}", smsMessage); myProcessor.smsOutput().send(MessageBuilder.withPayload(smsMessage).build()); } /** * 发送消息 邮箱地址 * * @param emailMessage */ public void sendEmail(String emailMessage) { logger.info("邮箱地址消息发送成功,消息为:{}", emailMessage); myProcessor.emailOutput().send(MessageBuilder.withPayload(emailMessage).build()); } }
MessageProducerTest.java
package com.example; import com.example.producer.SourceMessageProducer; import org.junit.jupiter.api.Test; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; @SpringBootTest(classes = {StreamProducerApplication.class}) public class MessageProducerTest { @Autowired private SourceMessageProducer sourceMessageProducer; @Test public void testSendSource() { sourceMessageProducer.send("10086|10086@email.com"); } }
消息驱动微服务 A 控制台打印结果以下:
电话号码为:10086,调用短信发送服务,发送短信... 邮箱地址为:10086@email.com,调用邮件发送服务,发送邮件...
消息驱动微服务 B 控制台打印结果以下:
原始消息接收成功,原始消息为:10086|10086@email.com 电话号码消息发送成功,消息为:10086 邮箱地址消息发送成功,消息为:10086@email.com
RabbitMQ 界面以下:
下一篇咱们讲解 Stream 如何实现消息分组和消息分区,记得关注噢~
本文采用 知识共享「署名-非商业性使用-禁止演绎 4.0 国际」许可协议
。
你们能够经过 分类
查看更多关于 Spring Cloud
的文章。
🤗 您的点赞
和转发
是对我最大的支持。
📢 扫码关注 哈喽沃德先生
「文档 + 视频」每篇文章都配有专门视频讲解,学习更轻松噢 ~