Spring Cloud Stream 在 Spring Cloud 体系内用于构建高度可扩展的基于事件驱动的微服务,其目的是为了简化消息在 Spring Cloud 应用程序中的开发。Spring Cloud Stream (后面以 SCS 代替 Spring Cloud Stream) 自己内容不少,并且它还有不少外部的依赖,想要熟悉 SCS,必需要先了解 Spring Messaging 和 Spring Integration 这两个项目,接下来文章将从如下几点跟你们进行介绍:html
Spring Messaging 是 Spring Framework 中的一个模块,其做用就是统一消息的编程模型。git
Messaging
对应的模型就包括一个消息体 Payload 和消息头 Header:package org.springframework.messaging; public interface Message<T> { T getPayload(); MessageHeaders getHeaders(); }
MessageChannel
用于接收消息,调用 send
方法能够将消息发送至该消息通道中 :@FunctionalInterface public interface MessageChannel { long INDEFINITE_TIMEOUT = -1; default boolean send(Message<?> message) { return send(message, INDEFINITE_TIMEOUT); } boolean send(Message<?> message, long timeout); }
消息通道里的消息如何被消费呢?github
SubscribableChannel
实现,被 MessageHandler
消息处理器所订阅:public interface SubscribableChannel extends MessageChannel { boolean subscribe(MessageHandler handler); boolean unsubscribe(MessageHandler handler); }
MessageHandler
真正地消费/处理消息:@FunctionalInterface public interface MessageHandler { void handleMessage(Message<?> message) throws MessagingException; }
Spring Messaging 内部在消息模型的基础上衍生出了其它的一些功能,如:web
HandlerMethodArgumentResolver
配合 @Header
, @Payload
等注解使用;消息接收后的返回值处理器 HandlerMethodReturnValueHandler
配合 @SendTo
注解使用;MessageConverter
;AbstractMessageSendingTemplate
;ChannelInterceptor
;Spring Integration 提供了 Spring 编程模型的扩展用来支持企业集成模式(Enterprise Integration Patterns),是对 Spring Messaging 的扩展。它提出了很多新的概念,包括消息的路由 MessageRoute
、消息的分发 MessageDispatcher
、消息的过滤 Filter
、消息的转换 Transformer
、消息的聚合 Aggregator
、消息的分割 Splitter
等等。同时还提供了 MessageChannel
和MessageHandler
的实现,分别包括 DirectChannel
、ExecutorChannel
、PublishSubscribeChannel
和MessageFilter
、ServiceActivatingHandler
、MethodInvokingSplitter
等内容。spring
首先为你们介绍几种消息的处理方式:sql
接下来,咱们以一个最简单的例子来尝试一下 Spring Integration:编程
SubscribableChannel messageChannel = new DirectChannel(); // 1 messageChannel.subscribe(msg -> { // 2 System.out.println("receive: " + msg.getPayload()); }); messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build()); // 3
messageChannel
;MessageHandler
去消费这个消息通道里的消息;MessageHandler
所消费,最后控制台打印出: receive: msg from alibaba
;DirectChannel
内部有个 UnicastingDispatcher
类型的消息分发器,会分发到对应的消息通道 MessageChannel
中,从名字也能够看出来,UnicastingDispatcher
是个单播的分发器,只能选择一个消息通道。那么如何选择呢? 内部提供了 LoadBalancingStrategy
负载均衡策略,默认只有轮询的实现,能够进行扩展。架构
咱们对上段代码作一点修改,使用多个 MessageHandler
去处理消息:并发
SubscribableChannel messageChannel = new DirectChannel(); messageChannel.subscribe(msg -> { System.out.println("receive1: " + msg.getPayload()); }); messageChannel.subscribe(msg -> { System.out.println("receive2: " + msg.getPayload()); }); messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build()); messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
因为 DirectChannel
内部的消息分发器是 UnicastingDispatcher
单播的方式,而且采用轮询的负载均衡策略,因此这里两次的消费分别对应这两个 MessageHandler
。控制台打印出:负载均衡
receive1: msg from alibaba receive2: msg from alibaba
既然存在单播的消息分发器 UnicastingDispatcher
,必然也会存在广播的消息分发器,那就是 BroadcastingDispatcher
,它被 PublishSubscribeChannel
这个消息通道所使用。广播消息分发器会把消息分发给全部的 MessageHandler
:
SubscribableChannel messageChannel = new PublishSubscribeChannel(); messageChannel.subscribe(msg -> { System.out.println("receive1: " + msg.getPayload()); }); messageChannel.subscribe(msg -> { System.out.println("receive2: " + msg.getPayload()); }); messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build()); messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
发送两个消息,都被全部的 MessageHandler
所消费。控制台打印:
receive1: msg from alibaba receive2: msg from alibaba receive1: msg from alibaba receive2: msg from alibaba
SCS与各模块之间的关系是:
Binder
, Binding
, @EnableBinding
, @StreamListener
等概念;/bindings
, /channels
endpoint;BindingProperties
, BinderProperties
等外部化配置类;Binder
是提供与外部消息中间件集成的组件,为构造 Binding
提供了 2 个方法,分别是 bindConsumer
和 bindProducer
,它们分别用于构造生产者和消费者。目前官方的实现有 Rabbit Binder 和 Kafka Binder, Spring Cloud Alibaba 内部已经实现了 RocketMQ Binder。
从图中能够看出,Binding
是链接应用程序跟消息中间件的桥梁,用于消息的消费和生产。咱们来看一个最简单的使用 RocketMQ Binder 的例子,而后分析一下它的底层处理原理:
@SpringBootApplication @EnableBinding({ Source.class, Sink.class }) // 1 public class SendAndReceiveApplication { public static void main(String[] args) { SpringApplication.run(SendAndReceiveApplication.class, args); } @Bean // 2 public CustomRunner customRunner() { return new CustomRunner(); } public static class CustomRunner implements CommandLineRunner { @Autowired private Source source; @Override public void run(String... args) throws Exception { int count = 5; for (int index = 1; index <= count; index++) { source.output().send(MessageBuilder.withPayload("msg-" + index).build()); // 3 } } } }
@Service public class StreamListenerReceiveService { @StreamListener(Sink.INPUT) // 4 public void receiveByStreamListener1(String receiveMsg) { System.out.println("receiveByStreamListener: " + receiveMsg); } }
这段代码很简单,没有涉及到 RocketMQ 相关的代码,消息的发送和接收都是基于 SCS 体系完成的。若是想切换成 RabbitMQ 或 kafka,只需修改配置文件便可,代码无需修改。
咱们分析这段代码的原理:
@EnableBinding
对应的两个接口属性 Source
和 Sink
是 SCS 内部提供的。SCS 内部会基于 Source
和 Sink
构造 BindableProxyFactory
,且对应的 output 和 input 方法返回的 MessageChannel 是 DirectChannel
。output 和 input 方法修饰的注解对应的 value 是配置文件中 binding 的 name。public interface Source { String OUTPUT = "output"; @Output(Source.OUTPUT) MessageChannel output(); } public interface Sink { String INPUT = "input"; @Input(Sink.INPUT) SubscribableChannel input(); }
配置文件里 bindings 的 name 为 output 和 input,对应 Source
和 Sink
接口的方法上的注解里的 value:
spring.cloud.stream.bindings.output.destination=test-topic spring.cloud.stream.bindings.output.content-type=text/plain spring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group spring.cloud.stream.bindings.input.destination=test-topic spring.cloud.stream.bindings.input.content-type=text/plain spring.cloud.stream.bindings.input.group=test-group1
CommandLineRunner
,程序启动的时候会执行 CustomRunner
的 run
方法。Source
接口里的 output 方法获取 DirectChannel
,并发送消息到这个消息通道中。这里跟以前 Spring Integration 章节里的代码一致。DirectChannel
消息通道以后会被 AbstractMessageChannelBinder#SendingHandler
这个 MessageHandler
处理,而后它会委托给 AbstractMessageChannelBinder#createProducerMessageHandler
建立的 MessageHandler 处理(该方法由不一样的消息中间件实现);AbstractMessageChannelBinder#createProducerMessageHandler
方法返回的 MessageHandler 内部会把 Spring Message 转换成对应中间件的 Message 模型并发送到对应中间件的 broker;@StreamListener
进行消息的订阅。请注意,注解里的 Sink.input
对应的值是 "input",会根据配置文件里 binding 对应的 name 为 input 的值进行配置:AbstractMessageChannelBinder#createConsumerEndpoint
方法会使用 Consumer 订阅消息,订阅到消息后内部会把中间件对应的 Message 模型转换成 Spring Message;@StreamListener
对应的 StreamListenerMessageHandler
订阅了 name 为 input 的消息通道,进行了消息的消费;这个过程文字描述有点啰嗦,用一张图总结一下(黄色部分涉及到各消息中间件的 Binder 实现以及 MQ 基本的订阅发布功能):
SCS 章节的最后,咱们来看一段 SCS 关于消息的处理方式的一段代码:
@StreamListener(value = Sink.INPUT, condition = "headers['index']=='1'") public void receiveByHeader(Message msg) { System.out.println("receive by headers['index']=='1': " + msg); } @StreamListener(value = Sink.INPUT, condition = "headers['index']=='9999'") public void receivePerson(@Payload Person person) { System.out.println("receive Person: " + person); } @StreamListener(value = Sink.INPUT) public void receiveAllMsg(String msg) { System.out.println("receive allMsg by StreamListener. content: " + msg); } @StreamListener(value = Sink.INPUT) public void receiveHeaderAndMsg(@Header("index") String index, Message msg) { System.out.println("receive by HeaderAndMsg by StreamListener. content: " + msg); }
有没有发现这段代码跟 Spring MVC Controller 中接收请求的代码很像? 实际上他们的架构都是相似的,Spring MVC 对于 Controller 中参数和返回值的处理类分别是 org.springframework.web.method.support.HandlerMethodArgumentResolver
、 org.springframework.web.method.support.HandlerMethodReturnValueHandler
。
Spring Messaging 中对于参数和返回值的处理类以前也提到过,分别是 org.springframework.messaging.handler.invocation.HandlerMethodArgumentResolver
、org.springframework.messaging.handler.invocation.HandlerMethodReturnValueHandler
。
它们的类名如出一辙,甚至内部的方法名也同样。
上图是 SCS 体系相关类说明的总结,关于 SCS 以及 RocketMQ Binder 更多相关的示例,能够参考 RocketMQ Binder Demos,包含了消息的聚合、分割、过滤;消息异常处理;消息标签、sql过滤;同步、异步消费等等。
下一篇文章,咱们将分析消息总线(Spring Cloud Bus) 在 Spring Cloud 体系中的做用,并逐步展开,分析 Spring Cloud Alibaba 中的 RocketMQ Binder 是如何实现 Spring Cloud Stream 标准的。
原文连接 本文为云栖社区原创内容,未经容许不得转载。