Spring cloud stream【入门介绍】

案例代码:https://github.com/q279583842q/springcloud-e-bookhtml

  在实际开发过程当中,服务与服务之间通讯常常会使用到消息中间件,而以往使用了哪一个中间件好比RabbitMQ,那么该中间件和系统的耦合性就会很是高,若是咱们要替换为Kafka那么变更会比较大,这时咱们可使用SpringCloudStream来整合咱们的消息中间件,来下降系统和中间件的耦合性。java

1、什么是SpringCloudStream

  官方定义 Spring Cloud Stream 是一个构建<font color='red'>消息驱动</font>微服务的框架。   应用程序经过 inputs 或者 outputs 来与 Spring Cloud Stream 中binder 交互,经过咱们配置来 binding ,而 Spring Cloud Stream 的 binder 负责与消息中间件交互。因此,咱们只须要搞清楚如何与 Spring Cloud Stream 交互就能够方便使用消息驱动的方式。   经过使用Spring Integration来链接消息代理中间件以实现消息事件驱动。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。目前仅支持RabbitMQKafkagit

2、Stream 解决了什么问题?

  Stream解决了开发人员无感知的使用消息中间件的问题,由于Stream对消息中间件的进一步封装,能够作到代码层面对中间件的无感知,甚至于动态的切换中间件(rabbitmq切换为kafka),使得微服务开发的高度解耦,服务能够关注更多本身的业务流程github

官网结构图web

在这里插入图片描述

组成 说明
Middleware 中间件,目前只支持RabbitMQ和Kafka
Binder Binder是应用与消息中间件之间的封装,目前实行了Kafka和RabbitMQ的Binder,经过Binder能够很方便的链接中间件,能够动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些均可以经过配置文件来实现
@Input 注解标识输入通道,经过该输入通道接收到的消息进入应用程序
@Output 注解标识输出通道,发布的消息将经过该通道离开应用程序
@StreamListener 监听队列,用于消费者的队列的<font color='red'>消息接收</font>
@EnableBinding 指信道channel和exchange绑定在一块儿

3、消息驱动入门案例

  咱们经过一个入门案例来演示下经过stream来整合RabbitMQ来实现消息的异步通讯的效果,因此首先要开启RabbitMQ服务,RabbitMQ不清楚的请参考此文:https://dpb-bobokaoya-sm.blog.csdn.net/article/details/90409404spring

1.建立消息发送者服务

1.1 建立项目

  建立一个SpringCloud项目apache

在这里插入图片描述

1.2 pom文件

  pom文件中重点是要添加<font color='red'>spring-cloud-starter-stream-rabbit</font>这个依赖app

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>1.5.13.RELEASE</version>
	</parent>
	<groupId>com.bobo</groupId>
	<artifactId>stream-sender</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-dependencies</artifactId>
				<version>Dalston.SR5</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-eureka</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

1.3 配置文件

  配置文件中除了必要的服务名称端口Eureka的信息外咱们还要添加<font color='red'>RabbitMQ</font>的注册信息框架

spring.application.name=stream-sender
server.port=9060
#设置服务注册中心地址,指向另外一个注册中心
eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/

#rebbitmq 连接信息
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/

1.4 建立消费发送者接口

  建立一个发送消息的接口。具体以下:方法名称自定义,返回类型必须是<font color='red'>SubscribableChannel</font>,在Output注解中指定交换器名称。异步

/**
 * 发送消息的接口
 * @author dengp
 *
 */
public interface ISendeService {

	/**
	 * 指定输出的交换器名称
	 * @return
	 */
	@Output("dpb-exchange")
	SubscribableChannel send();
}

1.5 启动类

  在启动类中经过@EnableBinding注解绑定咱们建立的接口类。

@SpringBootApplication
@EnableEurekaClient
// 绑定咱们刚刚建立的发送消息的接口类型
@EnableBinding(value={ISendeService.class})
public class StreamSenderStart {

	public static void main(String[] args) {
		SpringApplication.run(StreamSenderStart.class, args);
	}
}

2.建立消息消费者服务

2.1 建立项目

在这里插入图片描述

2.2 pom文件

  添加的依赖和发送消息的服务是一致的

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>
	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>1.5.13.RELEASE</version>
	</parent>
	<groupId>com.bobo</groupId>
	<artifactId>stream-receiver</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<dependencyManagement>
		<dependencies>
			<dependency>
				<groupId>org.springframework.cloud</groupId>
				<artifactId>spring-cloud-dependencies</artifactId>
				<version>Dalston.SR5</version>
				<type>pom</type>
				<scope>import</scope>
			</dependency>
		</dependencies>
	</dependencyManagement>
	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-eureka</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.cloud</groupId>
			<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
		</dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

2.3 配置文件

  注意修改服务名称和端口

spring.application.name=stream-receiver
server.port=9061
#设置服务注册中心地址,指向另外一个注册中心
eureka.client.serviceUrl.defaultZone=http://dpb:123456@eureka1:8761/eureka/,http://dpb:123456@eureka2:8761/eureka/

#rebbitmq 连接信息
spring.rabbitmq.host=192.168.88.150
spring.rabbitmq.port=5672
spring.rabbitmq.username=dpb
spring.rabbitmq.password=123
spring.rabbitmq.virtualHost=/

2.4 建立接收消息的接口

  此接口和发送消息的接口类似,注意使用的是@Input注解。

/**
 * 接收消息的接口
 * @author dengp
 *
 */
public interface IReceiverService {

	/**
	 * 指定接收的交换器名称
	 * @return
	 */
	@Input("dpb-exchange")
	SubscribableChannel receiver();
}

2.5 建立处理消息的处理类

  注意此类并非实现上面建立的接口。而是经过@EnableBinding来绑定咱们建立的接口,同时经过<font color='red'>@StreamListener</font>注解来监听dpb-exchange对应的消息服务

/**
 * 具体接收消息的处理类
 * @author dengp
 *
 */
@Service
@EnableBinding(IReceiverService.class)
public class ReceiverService {

	@StreamListener("dpb-exchange")
	public void onReceiver(byte[] msg){
		System.out.println("消费者:"+new String(msg));
	}
}

2.6 启动类

  一样要添加@EnableBinding注解

@SpringBootApplication
@EnableEurekaClient
@EnableBinding(value={IReceiverService.class})
public class StreamReceiverStart {

	public static void main(String[] args) {
		SpringApplication.run(StreamReceiverStart.class, args);
	}
}

3.编写测试代码

  经过单元测试来测试服务。

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.test.context.junit4.SpringRunner;

import com.bobo.stream.StreamSenderStart;
import com.bobo.stream.sender.ISendeService;

@RunWith(SpringRunner.class)
@SpringBootTest(classes=StreamSenderStart.class)
public class StreamTest {
	
	@Autowired
	private ISendeService sendService;

	@Test
	public void testStream(){
		String msg = "hello stream ...";
		// 将须要发送的消息封装为Message对象
		Message message = MessageBuilder
								.withPayload(msg.getBytes())
								.build();
		sendService.send().send(message );
	}
}

启动消息消费者后,执行测试代码。结果以下:

在这里插入图片描述

消息接收者获取到了发送者发送的消息,同时咱们在RabbitMQ的web界面也能够看到相关的信息

在这里插入图片描述

总结

  咱们同stream实现了消息中间件的使用,咱们发现只有在两处地址和RabbitMQ有耦合,第一处是pom文件中的依赖,第二处是application.properties中的RabbitMQ的配置信息,而在具体的业务处理中并无出现任何RabbitMQ相关的代码,这时若是咱们要替换为Kafka的话咱们只须要将这两处换掉便可,即<font color='red'>实现了中间件和服务的高度解耦</font>。

原文出处:https://www.cnblogs.com/dengpengbo/p/11103943.html

相关文章
相关标签/搜索