Spring Cloud Stream 是消息中间件组件,它集成了 kafka 和 rabbitmq 。Spring Cloud Stream是一个用于构建消息驱动的微服务应用程序的框架,是一个基于Spring Boot 建立的独立生产级的,使用Spring Integration提供链接到消息代理的Spring应用。html
Spring Cloud Stream与各模块之间的关系是:java
SCS 在 Spring Integration 的基础上进行了封装,提出了 Binder, Binding, @EnableBinding, @StreamListener 等概念;
SCS 与 Spring Boot Actuator 整合,提供了 /bindings, /channels endpoint;
SCS 与 Spring Boot Externalized Configuration 整合,提供了 BindingProperties, BinderProperties 等外部化配置类;
SCS 加强了消息发送失败的和消费失败状况下的处理逻辑等功能。
SCS 是 Spring Integration 的增强,同时与 Spring Boot 体系进行了融合,也是 Spring Cloud Bus 的基础。它屏蔽了底层消息中间件的实现细节,但愿以统一的一套 API 来进行消息的发送/消费,底层消息中间件的实现细节由各消息中间件的 Binder 完成。git
Binder 是提供与外部消息中间件集成的组件,为构造 Binding提供了 2 个方法,分别是 bindConsumer 和 bindProducer ,它们分别用于构造生产者和消费者。目前官方的实现有 Rabbit Binder 和 Kafka Binder, Spring Cloud Alibaba 内部已经实现了 RocketMQ Binder。github
了解SpringCloud流的时候,咱们会发现,SpringCloud还有个Data Flow(数据流)的项目,下面是它们的区别:spring
Spring Cloud Stream:数据流操做开发包,封装了与Redis,Rabbit、Kafka等发送接收消息。是一套用于建立消息驱动(message-driven)微服务的框架。经过向主程序添加@EnableBinding,能够当即链接到消息代理,经过向方法添加@StreamListener,您将收到流处理事件。数据库
Spring Cloud Data Flow:大数据操做工具,做为Spring XD的替代产品,它是一个混合计算模型,结合了流数据与批量数据的处理方式。是构建数据集成和实时数据处理流水线的工具包。编程
Spring Cloud Data Flow的其中一个章节是包含了Spring Cloud Stream,因此应该说Spring Cloud Data Flow的范围更广,是相似于一种解决方案的集合,而Spring Cloud Stream只是一套消息驱动的框架。springboot
Spring Cloud Stream是在Spring Integration的基础上发展起来的。它为开发人员提供了一致的开发经验,以构建能够包含企业集成模式以与外部系统(例如数据库,消息代理等)链接的应用程序。服务器
如图所示,Spring Cloud Stream由一个中间件中立的核组成。应用经过Spring Cloud Stream插入的input(至关于消费者consumer,它是从队列中接收消息的)和output(至关于生产者producer,它是从队列中发送消息的。)通道与外界交流。网络
结论:
一、Spring Cloud Stream以消息做为流的基本单位,因此它已经不是狭义上的IO流,而是广义上的数据流动,从生产者到消费者的数据流动。
二、Spring Cloud Stream 最大的方便之处,莫过于抽象了事件驱动的一些概念,对于消息中间件的进一步封装,能够作到代码层面对中间件的无感知,甚至于动态的切换中间件,切换topic。使得微服务开发的高度解耦,服务能够关注更多本身的业务流程。
一、异步处理
好比用户在电商网站下单,下单完成后会给用户推送短信或邮件,发短信和邮件的过程就能够异步完成。由于下单付款是核心业务,发邮件和短信并不属于核心功能,而且可能耗时较长,因此针对这种业务场景能够选择先放到消息队列中,有其余服务来异步处理。
二、应用解耦:
假设公司有几个不一样的系统,各系统在某些业务有联动关系,好比 A 系统完成了某些操做,须要触发 B 系统及 C 系统。若是 A 系统完成操做,主动调用 B 系统的接口或 C 系统的接口,能够完成功能,可是各个系统之间就产生了耦合。用消息中间件就能够完成解耦,当 A 系统完成操做将数据放进消息队列,B 和 C 系统去订阅消息就能够了。这样各系统只要约定好消息的格式就行了。
三、流量削峰
好比秒杀活动,一会儿进来好多请求,有的服务可能承受不住瞬时高并发而崩溃,因此针对这种瞬时高并发的场景,在中间加一层消息队列,把请求先入队列,而后再把队列中的请求平滑的推送给服务,或者让服务去队列拉取。
四、日志处理
kafka 最开始就是专门为了处理日志产生的。
当碰到上面的几种状况的时候,就要考虑用消息队列了。若是你碰巧使用的是 RabbitMQ 或者 kafka ,并且一样也是在使用 Spring Cloud ,那能够考虑下用 Spring Cloud Stream。
介绍下面的例子以前,假定你已经对 RabbitMQ 有必定的了解。
Destination Binders:目标绑定器,目标指的是 kafka 仍是 RabbitMQ,绑定器就是封装了目标中间件的包。若是操做的是 kafka 就使用 kafka binder ,若是操做的是 RabbitMQ 就使用 rabbitmq binder。
Destination Bindings:外部消息传递系统和应用程序之间的桥梁,提供消息的“生产者”和“消费者”(由目标绑定器建立)
Message:一种规范化的数据结构,生产者和消费者基于这个数据结构经过外部消息系统与目标绑定器和其余应用程序通讯。
首先来认识一下 Spring Cloud Stream 中的几个重要概念:
应用模型:应用程序经过 inputs 或者 outputs 来与 Spring Cloud Stream 中Binder 交互,经过咱们配置来绑定,而 Spring Cloud Stream 的 Binder 负责与中间件交互。因此,咱们只须要搞清楚如何与 Spring Cloud Stream 交互就能够方便使用消息驱动的方式。
抽象绑定器(The Binder Abstraction)
Spring Cloud Stream实现Kafkat和RabbitMQ的Binder实现,也包括了一个TestSupportBinder,用于测试。你也能够写根据API去写本身的Binder.
Spring Cloud Stream 一样使用了Spring boot的自动配置,而且抽象的Binder使Spring Cloud Stream的应用得到更好的灵活性,好比:咱们能够在application.yml或application.properties中指定参数进行配置使用Kafka或者RabbitMQ,而无需修改咱们的代码。
在前面咱们测试的项目中并无修改application.properties,自动配置得益于Spring Boot
经过 Binder ,能够方便地链接中间件,能够经过修改application.yml中的spring.cloud.stream.bindings.input.destination
来进行改变消息中间件(对应于Kafka的topic,RabbitMQ的exchanges)
在这二者间的切换甚至不须要修改一行代码。
发布-订阅(Persistent Publish-Subscribe Support)
以下图是经典的Spring Cloud Stream的 发布-订阅 模型,生产者 生产消息发布在shared topic(共享主题)上,而后 消费者 经过订阅这个topic来获取消息
其中topic对应于Spring Cloud Stream中的destinations(Kafka 的topic,RabbitMQ的 exchanges)
官方文档这块原理说的有点深,就没写,详见官方文档
消费组(Consumer Groups)
尽管发布-订阅 模型经过共享的topic链接应用变得很容易,可是经过建立特定应用的多个实例的来扩展服务的能力一样重要,可是若是这些实例都去消费这条数据,那么极可能会出现重复消费的问题,咱们只须要同一应用中只有一个实例消费该消息,这时咱们能够经过消费组来解决这种应用场景, 当一个应用程序不一样实例放置在一个具备竞争关系的消费组中,组里面的实例中只有一个可以消费消息
设置消费组的配置为spring.cloud.stream.bindings.<channelName>.group
,
下面举一个DD博客中的例子:
下图中,经过网络传递过来的消息经过主题,按照分组名进行传递到消费者组中
此时能够经过spring.cloud.stream.bindings.input.group=Group-A
或spring.cloud.stream.bindings.input.group=Group-B
进行指定消费组
全部订阅指定主题的组都会收到发布消息的一个备份,每一个组中只有一个成员会收到该消息;若是没有指定组,那么默认会为该应用分配一个匿名消费者组,与全部其它组处于 订阅-发布 关系中。ps:也就是说若是管道没有指定消费组,那么这个匿名消费组会与其它组一块儿消费消息,出现了重复消费的问题。
消费者类型(Consumer Types)
1)支持有两种消费者类型:
异步
)同步
)在Spring Cloud 2.0版本前只支持 Message-driven这种异步类型的消费者,消息一旦可用就会传递,而且有一个线程能够处理它;当你想控制消息的处理速度时,可能须要用到同步消费者类型。
2)持久化
通常来讲全部拥有订阅主题的消费组都是持久化的,除了匿名消费组。 Binder的实现确保了全部订阅关系的消费订阅是持久的,一个消费组中至少有一个订阅了主题,那么被订阅主题的消息就会进入这个组中,不管组内是否中止。
注意: 匿名订阅自己是非持久化的,可是有一些Binder的实现(好比RabbitMQ)则能够建立非持久化的组订阅
一般状况下,当有一个应用绑定到目的地的时候,最好指定消费消费组。扩展Spring Cloud Stream应用程序时,必须为每一个输入绑定指定一个使用者组。这样作能够防止应用程序的实例接收重复的消息(除非须要这种行为,这是不寻常的)。
分区支持(Partitioning Support)
在消费组中咱们能够保证消息不会被重复消费,可是在同组下有多个实例的时候,咱们没法肯定每次处理消息的是否是被同一消费者消费,分区的做用就是为了确保具备共同特征标识的数据由同一个消费者实例进行处理,固然前边的例子是狭义的,通讯代理(broken topic)也能够被理解为进行了一样的分区划分。Spring Cloud Stream 的分区概念是抽象的,能够为不支持分区Binder实现(例如RabbitMQ)也可使用分区。
注意:要使用分区处理,你必须同时对生产者和消费者进行配置。
为了理解编程模型,须要熟悉下列核心概念:
Destination Binders(目的地绑定器):
Destination Binders是Spring Cloud Stream与外部消息中间件提供了必要的配置和实现促进集成的扩展组件。集成了生产者和消费者的消息的路由、链接和委托、数据类型转换、用户代码调用等。
尽管Binders帮咱们处理了许多事情,咱们仍须要对他进行配置。以后会讲
Destination Bindings (目的地绑定) :
如前所述,Destination Bindings 提供链接外部消息中间件和应用提供的生产者和消费者中间的桥梁。
使用@EnableBinding 注解打在一个配置类上来定义一个Destination Binding,这个注解自己包含有@Configuration,会触发Spring Cloud Stream的基本配置。
接下来的例子展现彻底配置且正常运行的Spring Cloud Stream应用,由INPUT
接收消息转换成String 类型并打印在控制台上,而后转换出一个大写的信息返回到OUTPUT
中。
@SpringBootApplication @EnableBinding(Processor.class) public class MyApplication { public static void main(String[] args) { SpringApplication.run(MyApplication.class, args); } @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) public String handle(String value) { System.out.println("Received: " + value); return value.toUpperCase(); } }
经过SendTo注解将方法内返回值转发到其余消息通道中,这里由于没有定义接收通道,提示消息已丢失,解决方法是新建一个接口,以下
public interface MyPipe{ //方法1 @Input(Processor.OUTPUT) //这里使用Processor.OUTPUT是由于要同一个管道,或者名称相同 SubscribableChannel input(); //还能够以下这样=====二选一便可========== //方法2 String INPUT = "output"; @Input(MyPipe.INPUT) SubscribableChannel input(); }
而后在在上边的方法下边加一个方法,并在@EnableBinding注解中改为@EnableBinding({Processor.class, MyPipe.class})
@StreamListener(MyPipe.INPUT) public void handleMyPipe(String value) { System.out.println("Received: " + value); }
Spring Cloud Stream已经为咱们提供了三个绑定消息通道的默认实现
他们的源码分别为:
public interface Sink { String INPUT = "input"; @Input("input") SubscribableChannel input(); } public interface Source { String OUTPUT = "output"; @Output("output") MessageChannel output(); } public interface Processor extends Source, Sink { }
Sink和Source中分别经过@Input和@Output注解定义了输入通道和输出通道,经过使用这两个接口中的成员变量来定义输入和输出通道的名称,Processor因为继承自这两个接口,因此同时拥有这两个通道。
注意:拥有多条管道的时候不能有输入输出管道名相同的,不然会出现发送消息被本身接收或报错的状况
咱们能够根据上述源码的方式来定义咱们本身的输入输出通道,定义输入通道须要返回SubscribaleChannel接口对象,这个接口继承自MessageChannel接口,它定义了维护消息通道订阅者的方法;定义输出通道则须要返回MessageChannel接口对象,它定义了向消息通道发送消息的方法。
依照上面的内容,咱们也能够建立本身的绑定通道 若是你实现了上边的MyPipe接口,那么直接使用这个接口就好
package com.cnblogs.hellxz; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.SubscribableChannel; public interface MyPipe { //方法1 // @Input(Source.OUTPUT) //Source.OUTPUT的值是output,咱们自定义也是同样的 // SubscribableChannel input(); //使用@Input注解标注的输入管道须要使用SubscribableChannel来订阅通道 //========二选一使用=========== //方法2 String INPUT = "output"; @Input(MyPipe.INPUT) SubscribableChannel input(); }
这里用Source.OUTPUT和第二种方法 是同样的,咱们只要将消息发送到名为output的管道中,那么监听output管道的输入流一端就能得到数据
@StreamListener(MyPipe.INPUT) public void receiveFromMyPipe(Object payload){ logger.info("Received: "+payload); }
在主类的头上的@EnableBinding改成@EnableBinding({Sink.class, MyPipe.class})
,加入了Mypipe接口的绑定
在test/java下建立com.cnblogs.hellxz
,并在包下新建一个测试类,以下
package com.cnblogs.hellxz; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.messaging.Source; import org.springframework.messaging.support.MessageBuilder; import org.springframework.test.context.junit4.SpringRunner; @RunWith(SpringRunner.class) @EnableBinding(value = {Source.class}) @SpringBootTest public class TestSendMessage { @Autowired private Source source; //注入接口和注入MessageChannel的区别在于发送时需不须要调用接口内的方法 @Test public void testSender() { source.output().send(MessageBuilder.withPayload("Message from MyPipe").build()); //假设注入了MessageChannel messageChannel; 由于绑定的是Source这个接口, //因此会使用其中的惟一产生MessageChannel的方法,那么下边的代码会是 //messageChannel.send(MessageBuilder.withPayload("Message from MyPipe").build()); } }
启动主类,清空输出,运行测试类,而后你就会获得在主类的控制台的消息以log形式输出Message from MyPipe
咱们是经过注入消息通道,并调用他的output方法声明的管道得到的MessageChannel实例,发送的消息
经过注入消息通道的方式虽然很直接,可是也容易犯错,当一个接口中有多个通道的时候,他们返回的实例都是MessageChannel,这样经过@Autowired注入的时候每每会出现有多个实例找到没法肯定须要注入实例的错误,咱们能够经过@Qualifier指定消息通道的名称,下面举例:
在主类包内建立一个拥有多个输出流的管道
/** * 多个输出管道 */ public interface MutiplePipe { @Output("output1") MessageChannel output1(); @Output("output2") MessageChannel output2(); }
建立一个测试类
@RunWith(SpringRunner.class) @EnableBinding(value = {MutiplePipe.class}) //开启绑定功能 @SpringBootTest //测试 public class TestMultipleOutput { @Autowired private MessageChannel messageChannel; @Test public void testSender() { //向管道发送消息 messageChannel.send(MessageBuilder.withPayload("produce by multiple pipe").build()); } }
启动测试类,会出现刚才说的不惟一的bean,没法注入
Caused by: org.springframework.beans.factory.NoUniqueBeanDefinitionException: No qualifying bean of type 'org.springframework.messaging.MessageChannel' available: expected single matching bean but found 6: output1,output2,input,output,nullChannel,errorChannel
咱们在@Autowired
旁边加上@Qualifier("output1")
,而后测试就能够正常启动了
经过上边的错误,咱们能够清楚的看到,每一个MessageChannel都是使用消息通道的名字作为bean的名称。
这里咱们没有使用监听这个管道,仅为了测试并发现问题
给消费者设置消费组和主题
spring.cloud.stream.bindings.<通道名>.group=<消费组名>
spring.cloud.stream.bindings.<通道名>.destination=<主题名>
给生产者指定通道的主题:spring.cloud.stream.bindings.<通道名>.destination=<主题名>
消费者开启分区,指定实例数量与实例索引
spring.cloud.stream.bindings.<通道名>.consumer.partitioned=true
spring.cloud.stream.instanceCount=1
(具体指定)spring.cloud.stream.instanceIndex=1
#设置当前实例的索引值生产者指定分区键
spring.cloud.stream.bindings.<通道名>.producer.partitionKeyExpress=<分区键>
spring.cloud.stream.bindings.<通道名>.producer.partitionCount=<分区数量>
SpringCloud Stream消息驱动能够简化开发人员对消息中间件的使用复杂度,让系统开发人员更多尽力专一与核心业务逻辑的开发。SpringCloud Stream基于SpringBoot实现,自动配置化的功能能够帮助咱们快速上手学习,相似与咱们以前学习的orm框架,能够平滑的切换多种不一样的数据库。
目前SpringCloud Stream 目前只支持 RabbitMQ和kafka。
stream这个项目让咱们没必要经过繁琐的自定义ampq来创建exchange,通道名称,以及队列名称和路由方式。只须要简单几步咱们就轻松使用stream完成推送到rabbitmq和kafafa,并完成监听工做。
绑定器
经过定义绑定器做为中间层,实现了应用程序与消息中间件细节之间的隔离。经过向应用程序暴露统一的Channel经过,是的应用程序不须要再考虑各类不一样的消息中间件的实现。当须要升级消息中间件,或者是更换其余消息中间件产品时,咱们须要作的就是更换对应的Binder绑定器而不须要修改任何应用逻辑 。
在该模型图上有以下几个核心概念:
消息驱动有通道,绑定MQ。
生产者消息传递到通道里面以后,通道是跟MQ作绑定,封装的。消息一旦到MQ以后,发送给消费者通道,而后消费者进行消费 。绑定部分是底层帮助实现的。
封装也只是实现了部分功能。MQ的功能不是百分百都实现了的。
Spring Cloud Stream是一个用于构建消息驱动的微服务应用程序的框架,是一个基于Spring Boot 建立的独立生产级的,使用Spring Integration提供链接到消息代理的Spring应用。介绍持久发布 - 订阅(persistent publish-subscribe)
的语义,消费组(consumer groups)
和分区(partitions)
的概念。
你能够添加@EnableBinding
注解在你的应用上,从而当即链接到消息代理,在方法上添加@StreamListener
以使其接收流处理事件,下面的例子展现了一个Sink应用接收外部信息
@SpringBootApplication @EnableBinding(Sink.class) public class VoteRecordingSinkApplication { public static void main(String[] args) { SpringApplication.run(VoteRecordingSinkApplication.class, args); } @StreamListener(Sink.INPUT) public void processVote(Vote vote) { votingService.recordVote(vote); } }
@EnableBinding
注解会带着一个或多个接口做为参数(举例中使用的是Sink的接口),一个接口每每声名了输入和输出的渠道,Spring Stream提供了Source
、Sink
、Processor
这三个接口,你也能够本身定义接口。
stream默认提供的消费者和生产者接口:
public interface Sink {
String INPUT = "input"; @Input(Sink.INPUT) SubscribableChannel input(); } public interface Source { String OUTPUT = "output"; @Output("output") MessageChannel output(); }
@Input
注解区分了一个输入channel,经过它接收消息到应用中,使用@Output
注解 区分输出channel,消息经过它离开应用,使用这两个注解能够带一个channel的名字做为参数,若是未提供channel名称,则使用带注释的方法的名称。
你可使用Spring Cloud Stream 现成的接口,也可使用@Autowired
注入这个接口,下面在测试类中举例
@RunWith(SpringJUnit4ClassRunner.class) @SpringBootTest public class LoggingConsumerApplicationTests { @Autowired private Sink sink; @Test public void contextLoads() { assertNotNull(this.sink.input()); } }
首先,stream提供了默认的输入和输出经过。若是咱们不须要多个通道,能够经过@Enbalebing(Sink.Class)来绑定输入通道。对应的application里面的
# rabbitmq默认地址配置 rabbitmq: host: asdf.me port: 5672 username: guest password: guest cloud: stream: bindings: input: destination: push-exchange output: destination: push-exchange
这样会自动创建一个exchange为push-exchange名字的输出通道。同理@Enbalebing(Input.Class)是绑定输入通道的。下面建立一个生产者和消费者:
@EnableBinding(Source.class) public class Producer { @Autowired @Output(Source.OUTPUT) private MessageChannel channel; public void send() { channel.send(MessageBuilder.withPayload("producer" + UUID.randomUUID().toString()).build()); }
消费者:
@EnableBinding(Sink.class) public class Consumer { @StreamListener(Sink.INPUT) public void receive(Message<String> message) { System.out.println("接收到MQ消息:" + JSONObject.toJSONString(message)); } }
stream默认提供的消费者和生产者接口:
public interface Sink { String INPUT = "input"; @Input(Sink.INPUT) SubscribableChannel input(); } public interface Source { String OUTPUT = "output"; @Output("output") MessageChannel output(); }
能够看出,会去找到咱们在application.yaml里面定义的input,output下面的destination。分别做为输入和输出通道。咱们也能够本身定义接口来实现:
String WS_INPUT = "ws-consumer"; String EMAIL_INPUT = "email-consumer"; String SMS_INPUT = "sms-consumer"; @Input(MqMessageInputConfig.EMAIL_INPUT) SubscribableChannel emailChannel(); @Input(MqMessageInputConfig.WS_INPUT) SubscribableChannel wsChannel(); @Input(MqMessageInputConfig.SMS_INPUT) SubscribableChannel smChannel();
import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; public interface MqMessageOutputConfig { String MESSAGE_OUTPUT = "message-producter"; @Output(MqMessageOutputConfig.MESSAGE_OUTPUT) MessageChannel outPutChannel(); }
接收到MQ消息:{"headers":{"amqp_receivedDeliveryMode":"PERSISTENT","amqp_receivedRoutingKey":"my-test-channel","amqp_receivedExchange":"my-test-channel","amqp_deliveryTag":1,"amqp_consumerQueue":"my-test-channel.anonymous.vYA2O6ZSQE-S9MOnE0ZoJQ","amqp_redelivered":false,"id":"805e7fc3-a046-e07a-edf5-def58d9c8eab","amqp_consumerTag":"amq.ctag-QwsmRKg5f0DGSp-7wbpYxQ","contentType":"text/plain","timestamp":1523930106483},"payload":"22222222222a7d24456-5b11-4c25-9270-876e7bbc556a"}
#(井号):能够匹配多个单词(或者零个)
fanout:广播模式,发送到全部的队列
direct:直传。彻底匹配routingKey的队列能够收到消息。