Spring Cloud Stream 是一个构建消息驱动微服务的框架,该框架在Spring Boot的基础上整合了Spring Integrationg来链接消息代理中间件(RabbitMQ, Kafka等),提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。
应用程序经过input通道或者output通道来与Spring Cloud Stream中binder(绑定器)交互,经过配置来binding. 而Spring Cloud Stream的binder负责与中间件交互。java
开发工具:IntelliJ IDEA 2019.2.3web
1、服务器端spring
一、建立项目apache
IDEA中建立一个新的SpringBoot项目,名称为“spring-server”,SpringBoot版本选择2.1.10,在选择Dependencies(依赖)的界面勾选Spring Cloud Discovery -> Eureka Server。
pom.xml完整内容以下:浏览器
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.10.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>spring-server</artifactId> <version>0.0.1-SNAPSHOT</version> <name>spring-server</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> <spring-cloud.version>Greenwich.SR4</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
二、修改配置application.yml服务器
修改端口号为8761;取消将本身信息注册到Eureka服务器,不从Eureka服务器抓取注册信息。app
server: port: 8761 eureka: client: register-with-eureka: false fetch-registry: false
三、修改启动类代码框架
增长注解@EnableEurekaServermaven
package com.example.springserver; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer; @SpringBootApplication @EnableEurekaServer public class SpringServerApplication { public static void main(String[] args) { SpringApplication.run(SpringServerApplication.class, args); } }
2、消息生产者ide
一、建立项目
IDEA中建立一个新的SpringBoot项目,名称为“spring-producer”,SpringBoot版本选择2.1.10,在选择Dependencies(依赖)的界面勾选Web -> Spring Web,Spring Cloud Discovery -> Eureka Discovery Client。
打开pom.xml,添加依赖spring-cloud-starter-stream-rabbit,会自动引入spring-cloud-stream和spring-cloud-stream-binder。
pom.xml完整内容以下:
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.10.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>spring-producer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>spring-producer</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> <spring-cloud.version>Greenwich.SR4</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-test-support</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
二、修改配置application.yml
pom.xml使用RabbitMQ,默认状况下,链接本地的5672端口。下面这段rabbitmq也可省略。
server: port: 8081 spring: application: name: spring-producer eureka: instance: hostname: localhost client: serviceUrl: defaultZone: http://localhost:8761/eureka/ rabbitmq: host: localhost post: 5672 username: guest password: guest
三、编写发送服务
方法sendOrder使用@Output("myInput")注解表示建立myInput的消息通道。调用该方法后,会向myInput通道投递消息。
若是不使用参数myInput,则使用方法名做为通道名称。
package com.example.springproducer; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.SubscribableChannel; public interface SendService { @Output("myInput") SubscribableChannel sendOrder(); }
四、修改启动类代码
加入注解@EnableBinding以开启Spring容器的绑定功能,以SendService.class为参数,Spring容器启动时,会自动绑定SendService接口中定义的通道。
package com.example.springproducer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.netflix.eureka.EnableEurekaClient; import org.springframework.cloud.stream.annotation.EnableBinding; @SpringBootApplication @EnableEurekaClient @EnableBinding(SendService.class) public class SpringProducerApplication { public static void main(String[] args) { SpringApplication.run(SpringProducerApplication.class, args); } }
五、添加一个控制器类
调用SendService的发送方法,往服务器发送消息。
package com.example.springproducer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; @RestController public class ProducerController { @Autowired SendService sendService; @RequestMapping(value="/send",method= RequestMethod.GET) public String sendRequest(){ //建立消息 Message msg = MessageBuilder.withPayload("hello world".getBytes()).build(); //发送消息 sendService.sendOrder().send(msg); return "SUCCESS"; } }
3、消息消费者
一、建立项目
IDEA中建立一个新的SpringBoot项目,名称为“spring-consumer”,SpringBoot版本选择2.1.10,在选择Dependencies(依赖)的界面勾选Web -> Spring Web,Spring Cloud Discovery -> Eureka Discovery Client。
打开pom.xml,添加依赖
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.1.10.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.example</groupId> <artifactId>spring-consumer</artifactId> <version>0.0.1-SNAPSHOT</version> <name>spring-consumer</name> <description>Demo project for Spring Boot</description> <properties> <java.version>1.8</java.version> <spring-cloud.version>Greenwich.SR4</spring-cloud.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>
二、修改配置application.yml
server: port: 8080 spring: application: name: spring-consumer eureka: instance: hostname: localhost client: serviceUrl: defaultZone: http://localhost:8761/eureka/ rabbitmq: host: localhost post: 5672 username: guest password: guest
三、缩写接受消息的通道接口
package com.example.springconsumer; import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; public interface ReceiveService { @Input("myInput") SubscribableChannel myInput(); }
四、修改启动类代码
一样绑定消息通道
package com.example.springconsumer; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; @SpringBootApplication @EnableBinding(ReceiveService.class) public class SpringConsumerApplication { public static void main(String[] args) { SpringApplication.run(SpringConsumerApplication.class, args); } //订阅myInput通道的消息 @StreamListener("myInput") public void receive(byte[] msg){ System.out.println("接收到的消息:" + new String(msg)); } }
五、测试
(1)检查服务里面的RabbitMQ是否有启动(默认启动);
(2)启动spring-server(8761端口);
(3)启动spring-producer(8081端口);
(4)启动spring-consumer(8080端口);
(5)浏览器访问http://localhost:8081/send,spring-consumer项目的控制台输出:
接收到的消息:hello world
说明消费者已经能够从消息代理中获取到消息。
4、更换绑定器
上面使用了RabbitMQ做为消息代理,若是使用Kafka,能够更换Maven依赖实现。在生产者和消费者的pom.xml中,将spring-cloud-starter-stream-rabbit修改成spring-cloud-starter-stream-kafka。