Spring Cloud Stream应用与自定义RocketMQ Binder:编程模型

前言: 本文做者张天,节选自笔者与其合著的《Spring Cloud微服务架构进阶》,即将在八月出版问世。本文将其中Spring Cloud Stream应用与自定义Rocketmq Binder的内容抽取出来,主要介绍Spring Cloud Stream的相关概念,并概述相关的编程模型。前端

概述

Spring Cloud Stream 简介

Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它能够基于Spring Boot 来建立独立的,可用于生产的Spring 应用程序。他经过使用Spring Integration来链接消息代理中间件以实现消息事件驱动。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。Spring Cloud Stream目前仅支持RabbitMQ、Kafka。java

消息队列简介

消息队列中间件是分布式系统中最为重要的组件之一,主要解决应用耦合,异步消息,流量削锋等问题,是大型分布式系统不可缺乏的中间件。消息队列技术是分布式应用间交换信息的一种技术,消息可驻留在内存或磁盘上,队列存储消息直到它们被应用程序读走。经过消息队列,应用程序能够相对独立地执行,它们不须要知道彼此的位置,只须要处理从消息队列发送来的消息和向消息队列发送消息。c++

消息队列的主要特色是异步处理和解耦。其主要的使用场景就是将比较耗时并且不须要同步返回结果的操做做为消息放入消息队列。同时因为使用了消息队列,只要保证消息格式不变,消息的发送方和接受者并不须要彼此联系,也不须要受对方的影响,即解耦。web

消息队列的使用场景有:编程

  • 跨系统的异步通讯,须要异步交互的场景均可以使用消息队列。
  • 消息驱动的架构(EDA),系统分解为消息队列,消息队列制造者和消息队列消费者,一个是处理流程能够根据需求拆分红多个阶段,每一个阶段之间经过队列链接起来。
  • 流量削锋,它是消息队列中的经常使用场景之一,通常在秒杀或团抢活动中使用普遍。秒杀活动,通常会由于流量过大,致使流量暴增,应用挂掉,为解决这个问题,通常须要在应用前端加入消息队列,来缓和流量的暴增。

在软件的正常功能开发过程当中,开发人员并不须要去刻意的寻找消息队列的使用场景,而是当出现性能瓶颈时,去查看业务逻辑是否存在能够异步处理的耗时操做,若是存在的话即可以引入消息队列来解决。不然盲目的使用消息队列可能会增长维护和开发的成本却没法获得可观的性能提高,那就得不偿失了。json

常见的消息队列

目前业界有四款经常使用的消息队列,它们分别是RabbitMQ、RocketMQ、ActiveMQ和Kafka。咱们这里主要介绍前两种。安全

RabbitMQ

RabbitMQ在2007年发布,是一个在AMQP(高级消息队列协议)基础上完成的,可复用的企业消息系统,是当前最流行的消息中间件之一。 RabbitMQ的主要特性有:服务器

  • 可靠性: RabbitMQ提供了多种技术可让你在性能和可靠性之间进行权衡。这些技术包括持久性机制、投递确认、发布者证明和高可用性机制;
  • 灵活的路由:消息在到达队列前是经过交换机进行路由的。RabbitMQ为典型的路由逻辑提供了多种内置交换机类型。若是你有更复杂的路由需求,能够将这些交换机组合起来使用,你甚至能够实现本身的交换机类型,而且当作RabbitMQ的插件来使用;
  • 消息集群:在相同局域网中的多个RabbitMQ服务器能够聚合在一块儿,做为一个独立的逻辑代理来使用;
  • 队列高可用:队列能够在集群中的机器上进行镜像,以确保在硬件问题下还保证消息安全;
  • 多种协议的支持:RabbitMQ支持多种消息队列协议;
  • 多语言支持:RabbitMQ的服务器端用Erlang语言编写,其客户端支持基本全部编程语言;
  • 管理界面: RabbitMQ有一个易用的用户界面,使得用户能够监控和管理消息Broker的许多方面;
  • 跟踪机制:若是消息异常,RabbitMQ提供消息跟踪机制,使用者能够跟踪发现异常;
  • 插件机制:提供了许多插件,来从多方面进行扩展,也能够编写本身的插件;

RabbitMQ的优势有:微信

  • 因为erlang语言的特性,mq 性能较好,高并发;
  • 健壮、稳定、易用、跨平台、支持多种语言、文档齐全;
  • 有消息确认机制和持久化机制,可靠性高;
  • 高度可定制的路由;
  • 管理界面较丰富,在互联网公司也有较大规模的应用;
  • 社区活跃度高;

RabbitMQ的缺点有:架构

  • 尽管结合erlang语言自己的并发优点,性能较好,可是不利于作二次开发和维护;
  • 实现了代理架构,意味着消息在发送到客户端以前能够在中央节点上排队。此特性使得RabbitMQ易于使用和部署,可是使得其运行速度较慢,由于中央节点增长了延迟,消息封装后也比较大;
  • 须要学习比较复杂的接口和协议,学习和维护成本较高;
RocketMQ

RocketMQ出自阿里公司的开源产品,用 Java 语言实现,在设计时参考了 Kafka,并作出了本身的一些改进,消息可靠性上比 Kafka 更好。RocketMQ在阿里集团被普遍应用在订单,交易,充值,流计算,消息推送,日志流式处理,binglog分发等场景。

RocketMQ的主要特性有:

  • 是一个队列模型的消息中间件,具备高性能、高可靠、高实时、分布式特色;
  • Producer、Consumer、队列均可以分布式;
  • Producer向一些队列轮流发送消息,队列集合称为Topic,Consumer若是作广播消费,则一个consumer实例消费这个Topic对应的全部队列,若是作集群消费,则多个Consumer实例平均消费这个topic对应的队列集合;
  • 可以保证严格的消息顺序;
  • 提供丰富的消息拉取模式;
  • 高效的订阅者水平扩展能力;
  • 实时的消息订阅机制;
  • 亿级消息堆积能力;
  • 较少的依赖;

RocketMQ的优势有:

  • 单机支持 1 万以上持久化队列;
  • RocketMQ 的全部消息都是持久化的,先写入系统 PAGECACHE,而后刷盘,能够保证内存与磁盘都有一份数据;
  • 模型简单,接口易用(JMS 的接口不少场合并不太实用);
  • 性能很是好,能够大量堆积消息在broker中;
  • 支持多种消费,包括集群消费、广播消费等。
  • 各个环节分布式扩展设计,主从HA;

RocketMQ的缺点有:

  • 支持的客户端语言很少,目前是java及c++,其中c++不成熟;
  • RocketMQ社区关注度及成熟度也不及前二者;
  • 没有web管理界面,提供了一个CLI(命令行界面)管理工具带来查询、管理和诊断各类问题;
  • 没有在消息队列的核心部分实现JMS等接口;

原理简介

如图是Stream源码的流程图。Stream首先会动态注册相关BeanDefinition,而且处理@StreamListener注解;而后在Bean实例初始化以后,会调用BindingService进行服务绑定;BindingService在绑定服务时会首先获取特定的Binder绑定器,而后绑定Producer和Consumer;最后Stream的相关实例就会进行发送和接受消息的处理。

编程模型

Spring Cloud Stream提供了一系列的预先定义的注解来声明输入型和输出型channel,业务系统基于这些channel与消息中间件进行通讯,而不是直接与消息中间件进行通讯。

声明和绑定Channels

经过给业务应用的配置类添加@EnableBinding注解来将一个Spring应用转变成Spring Cloud Stream应用。@EnableBinding注解自己拥有@Configuration元注解来进行相关配置而且会触发Spring Cloud Stream框架的初始化机制。

@Configuration
@EnableIntegration
public @interface EnableBinding {
    ...
    Class<?>[] value() default {};
}
复制代码

@EnableBinding注解可使用声明输入型和输出行channel的接口类做为其value属性值。@EnableBinding注解只能使用在业务系统的Configuration类上,能够提供尽量多的接口类做为该注解的value属性值,好比说@EnableBinding(value={Order.class, Payment.class}),Order和Payment都是声明了channel的接口类。 在Spring Cloud Stream应用中,接口类能够经过被@Input@Output注解修饰的函数来声明的输入型和输出型channels。

public interface OnlineStore{
    @Input
    SubscribableChannel orders();  #声明输入型channel,表示接收订单
    @Output
    MessageChannel stock();         #声明输出型channel,表示向供应商进货
}
复制代码

使用这个接口类看成@EnableBinding的value属性值能够触发Stream框架的初始化机制,建立两个channel,名字分别为orders和stock,orders是输入型channel,而stock是输出型channel。

@EnableBinding(OnlineStore.class)
public class ShopConfiguration {
   ...
}
复制代码

自定义信道

使用@Input@Output注解,编程人员能够给每一个信道一个自定义的名称,使用这个自定义信道,能够与消息对立中相应的Channel进行交互。

public interface OnlineStore{
    @Input("inboundOrders")
    SubscribableChannel orders();
}
复制代码

在上边代码示例中,自定义信道的名称为inboundOrders,Stream框架会建立出名为inboundOrders的信道。

Spring Cloud Stream提供了预先设置的三种接口来定义输入型channel和输出型channel,它们是Source、Sink和Processor。Source用来声明输出型channel,它的信道名称为output。Sink用来声明输入型channel,它的信道名称为input。Processor则用来声明输出输入型的channel。

# Source
public interface Source {
  String OUTPUT = "output";
  @Output(Source.OUTPUT)
  MessageChannel output();
}
# Sink
public interface Sink {
  String INPUT = "input";
  @Input(Sink.INPUT)
  SubscribableChannel input();
}
# Processor
public interface Processor extends Source, Sink {
}
复制代码

产生和消费消息

使用Spring Integration注解或者Spring Cloud Stream的@StreamListener注解能够进行消息的发送和消费。@StreamListener注解基于Spring Messaging注解(好比说@MessageMapping,@JmsListener,@RabbitListener),除此以外,该注解添加了内容(content)类型管理和类型强制等特性。

做为Spring Integration的补充,Spring Cloud Stream提供了它本身的@StreamListener注解,该注解构建在Spring Messaging注解的基础上,好比说@MessageMapping、@JmsListener和@RabbitListener@StreamListener注解提供了更加简便处理输入消息的模型。

Spring Cloud Stream提供了可扩展的消息转换(MessageConverter)机制来处理数据转换,并将转换后的数据分配给对应的被@StreamListener修饰的方法。下面这个例子展现了一个处理外部订单消息的应用。

@EnableBinding(Sink.class)
public class OrderHandler {
  @Autowired
  OrderService orderService;
  @StreamListener(Sink.INPUT)
  public void handle(Order order) {
    orderService.handle(order);
  }
}
复制代码

假设,输入的Message对象有一个string类型的Payload和一个值为application/json的contentType。在使用@StreamListener时,MessageConverter会使用消息的contentType来解析String类型的Payload并赋值给Order对象。 就像其余的Spring Messaging方法同样,被@StreamListener注解的方法的参数可使用@Payload@Headers进行注解。对于返回数据的方法,必须使用@SendTo注解来指定该返回数据发送到哪一个输出型channel。

@EnableBinding(Processor.class)
public class TransformProcessor {
  @Autowired
  VotingService votingService;
  @StreamListener(Processor.INPUT)
  @SendTo(Processor.OUTPUT)
  public VoteResult handle(Vote vote) {
    return votingService.record(vote);
  }
}
复制代码

Spring Cloud Stream支持将消息分配到多个@StreamListener修饰的方法。为了能使用该分配机制,一个方法必须首先知足下列条件:

  • 方法不能有返回值。
  • 方法必须是单独一类消息的处理函数。

使用注解的condition属性中的SpEL表达式能够设置@StreamListener接收消息的条件判断。全部匹配了该condition的方法都会在同一个线程中被调用,可是方法调用相对顺序不能保证。

下面就是一个@StreamListener分配消息的例子。在这个例子中,全部头部属性type对应的值为food的消息都会被分配给receiveFoodOrder方法,全部头部属性type对应的值为compute的消息都会被分配给receiveComputeOrder方法。

@EnableBinding(Sink.class)
@EnableAutoConfiguration
public static class TestPojoWithAnnotatedArguments {
    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='food'")
    public void receiveFoodOrder(@Payload FoodOrder foodOrder) {
       // handle the message
    }
    @StreamListener(target = Sink.INPUT, condition = "headers['type']=='compute'")
    public void receiveComputeOrder(@Payload ComputeOrder computeOrder) {
       // handle the message
    }
}
复制代码

小结

本文主要介绍了Spring Cloud Stream中涉及到的相关概念,重点介绍了Spring Cloud Stream的编程模型,为后面文章实战应用和自定义奠基一些基础。Spring Cloud Stream封装了多种消息中间件的操做接口,目前只有kafka和rabbitmq,下一篇将会介绍如何自已实现一个Rocketmq的绑定器。

订阅最新文章,欢迎关注个人公众号

微信公众号
相关文章
相关标签/搜索