Spring Cloud Stream是一个用来为微服务应用构建消息驱动能力的框架。它能够基于Spring Boot来建立独立的、可用于生产的Spring应用程序。它经过使用Spring Integration来链接消息代理中间件以实现消息事件驱动的微服务应用。Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,而且引入了发布-订阅、消费组以及消息分区这三个核心概念。简单的说,Spring Cloud Stream本质上就是整合了Spring Boot和Spring Integration,实现了一套轻量级的消息驱动的微服务框架。经过使用Spring Cloud Stream,能够有效地简化开发人员对消息中间件的使用复杂度,让系统开发人员能够有更多的精力关注于核心业务逻辑的处理。因为Spring Cloud Stream基于Spring Boot实现,因此它秉承了Spring Boot的优势,实现了自动化配置的功能帮忙咱们能够快速的上手使用,可是目前为止Spring Cloud Stream只支持下面两个著名的消息中间件的自动化配置:html
RabbitMQ
Kafka
下面咱们经过构建一个简单的示例来对Spring Cloud Stream有一个初步认识。该示例主要目标是构建一个基于Spring Boot的微服务应用,这个微服务应用将经过使用消息中间件RabbitMQ来接收消息并将消息打印到日志中。因此,在进行下面步骤以前请先确认已经在本地安装了RabbitMQ,具体安装步骤请参考此文。git
建立一个基础的Spring Boot工程,命名为:stream-hello
github
编辑pom.xml
中的依赖关系,引入Spring Cloud Stream对RabbitMQ的支持,具体以下:spring
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.9.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>Dalston.SR4</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement>
SinkReceiver
,具体以下: @EnableBinding(Sink.class) public class SinkReceiver { private static Logger logger = LoggerFactory.getLogger(SinkReceiver.class); @StreamListener(Sink.INPUT) public void receive(Object payload) { logger.info("Received: " + payload); } }
@SpringBootApplication public class SinkApplication { public static void main(String[] args) { SpringApplication.run(SinkApplication.class, args); } }
到这里,咱们快速入门示例的编码任务就已经完成了。下面咱们分别启动RabbitMQ以及该Spring Boot应用,而后作下面的试验,看看它们是如何运做的。框架
... INFO 16272 --- [main] o.s.c.s.b.r.RabbitMessageChannelBinder : declaring queue for inbound: input.anonymous.Y8VsFILmSC27eS5StsXp6A, bound to: input INFO 16272 --- [main] o.s.a.r.c.CachingConnectionFactory : Created new connection: SimpleConnection@3c78e551 [delegate=amqp://guest@127.0.0.1:5672/] INFO 16272 --- [main] o.s.integration.channel.DirectChannel : Channel 'input.anonymous.Y8VsFILmSC27eS5StsXp6A.bridge' has 1 subscriber(s). INFO 16272 --- [main] o.s.i.a.i.AmqpInboundChannelAdapter : started inbound.input.anonymous.Y8VsFILmSC27eS5StsXp6A ...
从上面的日志内容中,咱们能够得到如下信息:spring-boot
guest
用户建立了一个指向127.0.0.1:5672
位置的RabbitMQ链接,在RabbitMQ的控制台中咱们也能够发现它。input.anonymous.Y8VsFILmSC27eS5StsXp6A
的队列,并经过RabbitMessageChannelBinder
将本身绑定为它的消费者。这些信息咱们也能在RabbitMQ的控制台中发现它们。下面咱们能够在RabbitMQ的控制台中进入input.anonymous.Y8VsFILmSC27eS5StsXp6A
队列的管理页面,经过Publish Message
功能来发送一条消息到该队列中。微服务
此时,咱们能够在当前启动的Spring Boot应用程序的控制台中看到下面的内容:单元测试
INFO 16272 --- [C27eS5StsXp6A-1] com.didispace.HelloApplication : Received: [B@7cba610e
咱们能够发如今应用控制台中输出的内容就是SinkReceiver
中receive
方法定义的,而输出的具体内容则是来自消息队列中获取的对象。这里因为咱们没有对消息进行序列化,因此输出的只是该对象的引用,在后面的小节中咱们会详细介绍接收消息后的处理。测试
在顺利完成上面快速入门的示例后,咱们简单解释一下上面的步骤是如何将咱们的Spring Boot应用链接上RabbitMQ来消费消息以实现消息驱动业务逻辑的。ui
首先,咱们对Spring Boot应用作的就是引入spring-cloud-starter-stream-rabbit
依赖,该依赖包是Spring Cloud Stream对RabbitMQ支持的封装,其中包含了对RabbitMQ的自动化配置等内容。从下面它定义的依赖关系中,咱们还能够知道它等价于spring-cloud-stream-binder-rabbit
依赖。
<dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-stream-binder-rabbit</artifactId> </dependency> </dependencies>
接着,咱们再来看看这里用到的几个Spring Cloud Stream的核心注解,它们都被定义在SinkReceiver
中:
@EnableBinding
,该注解用来指定一个或多个定义了@Input
或@Output
注解的接口,以此实现对消息通道(Channel)的绑定。在上面的例子中,咱们经过@EnableBinding(Sink.class)
绑定了Sink
接口,该接口是Spring Cloud Stream中默认实现的对输入消息通道绑定的定义,它的源码以下: public interface Sink { String INPUT = "input"; @Input(Sink.INPUT) SubscribableChannel input(); }
它经过@Input
注解绑定了一个名为input
的通道。除了Sink
以外,Spring Cloud Stream还默认实现了绑定output
通道的Source
接口,还有结合了Sink
和Source
的Processor
接口,实际使用时咱们也能够本身经过@Input
和@Output
注解来定义绑定消息通道的接口。当咱们须要为@EnableBinding
指定多个接口来绑定消息通道的时候,能够这样定义:@EnableBinding(value = {Sink.class, Source.class})
。
@StreamListener
:该注解主要定义在方法上,做用是将被修饰的方法注册为消息中间件上数据流的事件监听器,注解中的属性值对应了监听的消息通道名。在上面的例子中,咱们经过@StreamListener(Sink.INPUT)
注解将receive
方法注册为对input
消息通道的监听处理器,因此当咱们在RabbitMQ的控制页面中发布消息的时候,receive
方法会作出对应的响应动做。上面咱们经过RabbitMQ的控制台完成了发送消息来验证了消息消费程序的功能,虽然这种方法比较low,可是经过上面的步骤,相信你们对RabbitMQ和Spring Cloud Stream的消息消费已经有了一些基础的认识。下面咱们经过编写生产消息的单元测试用例来完善咱们的入门内容。
@RunWith(SpringRunner.class) @EnableBinding(value = {SinkApplicationTests.SinkSender.class}) public class SinkApplicationTests { @Autowired private SinkSender sinkSender; @Test public void sinkSenderTester() { sinkSender.output().send(MessageBuilder.withPayload("produce a message :http://blog.didispace.com").build()); } public interface SinkSender { String OUTPUT = "input"; @Output(SinkSender.OUTPUT) MessageChannel output(); } }
INFO 50947 --- [L2W-c2AcChb2Q-1] com.didispace.stream.SinkReceiver : Received: produce a message :http://blog.didispace.com
在上面的单元测试中,咱们经过@Output(SinkSender.OUTPUT)
定义了一个输出经过,而该输出通道的名称为input
,与前文中的Sink中定义的消费通道同名,因此这里的单元测试与前文的消费者程序组成了一对生产者与消费者。到这里,本文的内容就次结束,若是您可以独立的完成上面的例子,那么对于Spring Cloud Stream的基础使用算是入门了。可是,Spring Cloud Stream的使用远不止于此,在近期的博文中,我讲继续更新这部份内容,帮助他们来理解和用好Spring Cloud Stream来构建消息驱动的微服务!源码来源
本文完整实例: