前言: 本文做者张天,节选自笔者与其合著的《Spring Cloud微服务架构进阶》,即将在八月出版问世。本文将其中Spring Cloud Stream应用与自定义Rocketmq Binder的内容抽取出来,主要介绍Spring Cloud Stream的相关概念,并概述相关的编程模型。前端
Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它能够基于Spring Boot 来建立独立的,可用于生产的Spring 应用程序。他经过使用Spring Integration来链接消息代理中间件以实现消息事件驱动。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。Spring Cloud Stream目前仅支持RabbitMQ、Kafka。java
消息队列中间件是分布式系统中最为重要的组件之一,主要解决应用耦合,异步消息,流量削锋等问题,是大型分布式系统不可缺乏的中间件。消息队列技术是分布式应用间交换信息的一种技术,消息可驻留在内存或磁盘上,队列存储消息直到它们被应用程序读走。经过消息队列,应用程序能够相对独立地执行,它们不须要知道彼此的位置,只须要处理从消息队列发送来的消息和向消息队列发送消息。c++
消息队列的主要特色是异步处理和解耦。其主要的使用场景就是将比较耗时并且不须要同步返回结果的操做做为消息放入消息队列。同时因为使用了消息队列,只要保证消息格式不变,消息的发送方和接受者并不须要彼此联系,也不须要受对方的影响,即解耦。web
消息队列的使用场景有:编程
在软件的正常功能开发过程当中,开发人员并不须要去刻意的寻找消息队列的使用场景,而是当出现性能瓶颈时,去查看业务逻辑是否存在能够异步处理的耗时操做,若是存在的话即可以引入消息队列来解决。不然盲目的使用消息队列可能会增长维护和开发的成本却没法获得可观的性能提高,那就得不偿失了。json
目前业界有四款经常使用的消息队列,它们分别是RabbitMQ、RocketMQ、ActiveMQ和Kafka。咱们这里主要介绍前两种。安全
RabbitMQ在2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最流行的消息中间件之一。 RabbitMQ的主要特性有:服务器
RabbitMQ的优势有:微信
RabbitMQ的缺点有:架构
RocketMQ出自阿里公司的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并作出了本身的一些改进,消息可靠性上比 Kafka 更好。RocketMQ在阿里集团被普遍应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景。
RocketMQ的主要特性有:
RocketMQ的缺点有:
如图是Stream源码的流程图。Stream首先会动态注册相关BeanDefinition,而且处理@StreamListener注解;而后在Bean实例初始化以后,会调用BindingService进行服务绑定;BindingService在绑定服务时会首先获取特定的Binder绑定器,而后绑定Producer和Consumer;最后Stream的相关实例就会进行发送和接受消息的处理。
Spring Cloud Stream提供了一系列的预先定义的注解来声明输入型和输出型channel,业务系统基于这些channel与消息中间件进行通讯,而不是直接与消息中间件进行通讯。
经过给业务应用的配置类添加@EnableBinding
注解来将一个Spring应用转变成Spring Cloud Stream应用。@EnableBinding
注解自己拥有@Configuration
元注解来进行相关配置而且会触发Spring Cloud Stream框架的初始化机制。
@Configuration
@EnableIntegration
public @interface EnableBinding {
...
Class<?>[] value() default {};
}
复制代码
@EnableBinding
注解可使用声明输入型和输出行channel的接口类做为其value属性值。@EnableBinding
注解只能使用在业务系统的Configuration类上,能够提供尽量多的接口类做为该注解的value属性值,好比说@EnableBinding(value={Order.class, Payment.class})
,Order和Payment都是声明了channel的接口类。 在Spring Cloud Stream应用中,接口类能够经过被@Input
和@Output
注解修饰的函数来声明的输入型和输出型channels。
public interface OnlineStore{
@Input
SubscribableChannel orders(); #声明输入型channel,表示接收订单
@Output
MessageChannel stock(); #声明输出型channel,表示向供应商进货
}
复制代码
使用这个接口类看成@EnableBinding
的value属性值能够触发Stream框架的初始化机制,建立两个channel,名字分别为orders和stock,orders是输入型channel,而stock是输出型channel。
@EnableBinding(OnlineStore.class)
public class ShopConfiguration {
...
}
复制代码
使用@Input
和@Output
注解,编程人员能够给每一个信道一个自定义的名称,使用这个自定义信道,能够与消息对立中相应的Channel进行交互。
public interface OnlineStore{
@Input("inboundOrders")
SubscribableChannel orders();
}
复制代码
在上边代码示例中,自定义信道的名称为inboundOrders,Stream框架会建立出名为inboundOrders的信道。
Spring Cloud Stream提供了预先设置的三种接口来定义输入型channel和输出型channel,它们是Source、Sink和Processor。Source用来声明输出型channel,它的信道名称为output。Sink用来声明输入型channel,它的信道名称为input。Processor则用来声明输出输入型的channel。
# Source
public interface Source {
String OUTPUT = "output";
@Output(Source.OUTPUT)
MessageChannel output();
}
# Sink
public interface Sink {
String INPUT = "input";
@Input(Sink.INPUT)
SubscribableChannel input();
}
# Processor
public interface Processor extends Source, Sink {
}
复制代码
使用Spring Integration注解或者Spring Cloud Stream的@StreamListener注解能够进行消息的发送和消费。@StreamListener
注解基于Spring Messaging注解(好比说@MessageMapping,@JmsListener,@RabbitListener),除此以外,该注解添加了内容(content)类型管理和类型强制等特性。
做为Spring Integration的补充,Spring Cloud Stream提供了它本身的@StreamListener注解,该注解构建在Spring Messaging注解的基础上,好比说@MessageMapping、@JmsListener和@RabbitListener
。@StreamListener
注解提供了更加简便处理输入消息的模型。
Spring Cloud Stream提供了可扩展的消息转换(MessageConverter)机制来处理数据转换,并将转换后的数据分配给对应的被@StreamListener
修饰的方法。下面这个例子展现了一个处理外部订单消息的应用。
@EnableBinding(Sink.class)
public class OrderHandler {
@Autowired
OrderService orderService;
@StreamListener(Sink.INPUT)
public void handle(Order order) {
orderService.handle(order);
}
}
复制代码
假设,输入的Message对象有一个string类型的Payload和一个值为application/json的contentType。在使用@StreamListener
时,MessageConverter
会使用消息的contentType来解析String类型的Payload并赋值给Order对象。 就像其余的Spring Messaging方法同样,被@StreamListener
注解的方法的参数可使用@Payload
和@Headers
进行注解。对于返回数据的方法,必须使用@SendTo
注解来指定该返回数据发送到哪一个输出型channel。
@EnableBinding(Processor.class)
public class TransformProcessor {
@Autowired
VotingService votingService;
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public VoteResult handle(Vote vote) {
return votingService.record(vote);
}
}
复制代码
Spring Cloud Stream支持将消息分配到多个@StreamListener
修饰的方法。为了能使用该分配机制,一个方法必须首先知足下列条件:
使用注解的condition属性中的SpEL表达式能够设置@StreamListener
接收消息的条件判断。全部匹配了该condition的方法都会在同一个线程中被调用,可是方法调用相对顺序不能保证。
下面就是一个@StreamListener
分配消息的例子。在这个例子中,全部头部属性type对应的值为food的消息都会被分配给receiveFoodOrder方法,全部头部属性type对应的值为compute的消息都会被分配给receiveComputeOrder方法。
@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class TestPojoWithAnnotatedArguments {
@StreamListener(target = Sink.INPUT, condition = "headers['type']=='food'")
public void receiveFoodOrder(@Payload FoodOrder foodOrder) {
// handle the message
}
@StreamListener(target = Sink.INPUT, condition = "headers['type']=='compute'")
public void receiveComputeOrder(@Payload ComputeOrder computeOrder) {
// handle the message
}
}
复制代码
本文主要介绍了Spring Cloud Stream中涉及到的相关概念,重点介绍了Spring Cloud Stream的编程模型,为后面文章实战应用和自定义奠基一些基础。Spring Cloud Stream封装了多种消息中间件的操做接口,目前只有kafka和rabbitmq,下一篇将会介绍如何自已实现一个Rocketmq的绑定器。