做者 | 辽天
来源 | 阿里巴巴云原生公众号git
导读:本文将 rocktmq-spring-boot 的设计实现作一个简单的介绍,读者能够经过本文了解将 RocketMQ Client 端集成为 spring-boot-starter 框架的开发细节,而后经过一个简单的示例来一步一步的讲解如何使用这个 spring-boot-starter 工具包来配置,发送和消费 RocketMQ 消息。github
在 Spring 生态中玩转 RocketMQ 系列文章:web
本文配套可交互教程已登陆阿里云知行动手实验室,PC 端登陆http://www.javashuo.com/tag/start.aliyun.com 在浏览器中当即体验。spring
经过本文,您将了解到:apache
上世纪 90 年代末,随着 Java EE(Enterprise Edition) 的出现,特别是 Enterprise Java Beans 的使用须要复杂的描述符配置和死板复杂的代码实现,增长了广大开发者的学习曲线和开发成本,由此基于简单的 XML 配置和普通 Java 对象(Plain Old Java Objects)的 Spring 技术应运而生,依赖注入(Dependency Injection), 控制反转(Inversion of Control)和面向切面编程(AOP)的技术更加敏捷地解决了传统 Java 企业及版本的不足。编程
随着 Spring 的持续演进,基于注解(Annotation)的配置逐渐取代了 XML 文件配置,2014 年 4 月 1 日,Spring Boot 1.0.0 正式发布,它基于“约定大于配置”(Convention over configuration)这一理念来快速地开发、测试、运行和部署 Spring 应用,并能经过简单地与各类启动器(如 spring-boot-web-starter)结合,让应用直接以命令行的方式运行,不需再部署到独立容器中。这种简便直接快速构建和开发应用的过程,可使用约定的配置而且简化部署,受到愈来愈多的开发者的欢迎。设计模式
Apache RocketMQ 是业界知名的分布式消息和流处理中间件,简单地理解,它由 Broker 服务器和客户端两部分组成:浏览器
其中客户端一个是消息发布者客户端(Producer),它负责向 Broker 服务器发送消息;另一个是消息的消费者客户端(Consumer),多个消费者能够组成一个消费组,来订阅和拉取消费 Broker 服务器上存储的消息。bash
为了利用 Spring Boot 的快速开发和让用户可以更灵活地使用 RocketMQ 消息客户端,Apache RocketMQ 社区推出了 spring-boot-starter 实现。随着分布式事务消息功能在 RocketMQ 4.3.0 版本的发布,近期升级了相关的 spring-boot 代码,经过注解方式支持分布式事务的回查和事务消息的发送。服务器
本文将对当前的设计实现作一个简单的介绍,读者能够经过本文了解将 RocketMQ Client 端集成为 spring-boot-starter 框架的开发细节,而后经过一个简单的示例来一步一步的讲解如何使用这个 spring-boot-starter 工具包来配置,发送和消费 RocketMQ 消息。
顺便在这里讨论一下在 Spring 中关于消息的两个主要的框架,即 Spring Messaging 和 Spring Cloud Stream。它们都可以与 Spring Boot 整合并提供了一些参考的实现。和全部的实现框架同样,消息框架的目的是实现轻量级的消息驱动的微服务,能够有效地简化开发人员对消息中间件的使用复杂度,让系统开发人员能够有更多的精力关注于核心业务逻辑的处理。
Spring Messaging 是 Spring Framework 4 中添加的模块,是 Spring 与消息系统集成的一个扩展性的支持。它实现了从基于 JmsTemplate 的简单的使用 JMS 接口到异步接收消息的一整套完整的基础架构,Spring AMQP 提供了该协议所要求的相似的功能集。在与 Spring Boot 的集成后,它拥有了自动配置能力,可以在测试和运行时与相应的消息传递系统进行集成。
单纯对于客户端而言,Spring Messaging 提供了一套抽象的 API 或者说是约定的标准,对消息发送端和消息接收端的模式进行规定,不一样的消息中间件提供商能够在这个模式下提供本身的 Spring 实现:在消息发送端须要实现的是一个 XXXTemplate 形式的 Java Bean,结合 Spring Boot 的自动化配置选项提供多个不一样的发送消息方法;在消息的消费端是一个 XXXMessageListener 接口(实现方式一般会使用一个注解来声明一个消息驱动的 POJO),提供回调方法来监听和消费消息,这个接口一样可使用 Spring Boot 的自动化选项和一些定制化的属性。
若是有兴趣深刻的了解 Spring Messaging 及针对不一样的消息产品的使用,推荐阅读这个文件。参考 Spring Messaging 的既有实现,RocketMQ 的 spring-boot-starter 中遵循了相关的设计模式并结合 RocketMQ 自身的功能特色提供了相应的 API(如顺序、异步和事务半消息等)。
Spring Cloud Stream 结合了 Spring Integration 的注解和功能,它的应用模型以下:
该图片引自 spring cloud stream
Spring Cloud Stream 框架中提供一个独立的应用内核,它经过输入(@Input)和输出(@Output)通道与外部世界进行通讯,消息源端(Source)经过输入通道发送消息,消费目标端(Sink)经过监听输出通道来获取消费的消息。这些通道经过专用的 Binder 实现与外部代理链接。开发人员的代码只须要针对应用内核提供的固定的接口和注解方式进行编程,而不须要关心运行时具体的 Binder 绑定的消息中间件。在运行时,Spring Cloud Stream 可以自动探测并使用在 classpath 下找到的Binder。
这样开发人员能够轻松地在相同的代码中使用不一样类型的中间件:仅仅须要在构建时包含进不一样的 Binder。在更加复杂的使用场景中,也能够在应用中打包多个 Binder 并让它本身选择 Binder,甚至在运行时为不一样的通道使用不一样的 Binder。
Binder 抽象使得 Spring Cloud Stream 应用能够灵活的链接到中间件,加之 Spring Cloud Stream 使用利用了 Spring Boot 的灵活配置配置能力,这样的配置能够经过外部配置的属性和 Spring Boot 支持的任何形式来提供(包括应用启动参数、环境变量和 application.yml 或者 application.properties 文件),部署人员能够在运行时动态选择通道链接 destination(例如,Kafka 的 topic 或者 RabbitMQ 的 exchange)。
Binder SPI 的方式来让消息中间件产品使用可扩展的 API 来编写相应的 Binder,并集成到 Spring Cloud Steam 环境,目前 RocketMQ 尚未提供相关的 Binder,咱们计划在下一步将完善这一功能,也但愿社区里有这方面经验的同窗积极尝试,贡献 PR 或建议。
在开始的时候咱们已经知道,spring boot starter 构造的启动器对于使用者是很是方便的,使用者只要在 pom.xml引入starter 的依赖定义,相应的编译,运行和部署功能就所有自动引入。所以经常使用的开源组件都会为 Spring 的用户提供一个 spring-boot-starter 封装给开发者,让开发者很是方便集成和使用,这里咱们详细的介绍一下 RocketMQ(客户端)的 starter 实现过程。
对于一个 spring-boot-starter 实现须要包含以下几个部分:
<groupId>org.apache.rocketmq</groupId> <artifactId>spring-boot-starter-rocketmq</artifactId> <version>1.0.0-SNAPSHOT</version>
它分为两个部分:Spring 自身的依赖包和 RocketMQ 的依赖包。
定义应用属性配置文件类 RocketMQProperties,这个 Bean 定义一组默认的属性值。用户在使用最终的 starter 时,能够根据这个类定义的属性来修改取值,固然不是直接修改这个类的配置,而是 spring-boot 应用中对应的配置文件:src/main/resources/application.properties。
定义 src/resources/META-INF/spring.factories 文件中的自动加载类, 其目的是让 spring boot 更具文中中所指定的自动化配置类来自动初始化相关的 Bean、Component 或 Service,它的内容以下:
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.apache.rocketmq.spring.starter.RocketMQAutoConfiguration
在 RocketMQAutoConfiguration 类的具体实现中,定义开放给用户直接使用的 Bean 对象包括:
在发送端(producer)和消费端(consumer)客户端分别进行封装,在当前的实现版本提供了对 Spring Messaging 接口的兼容方式。
发送端的代码封装在 RocketMQTemplate POJO 中,下图是发送端的相关代码的调用关系图:
为了与 Spring Messaging 的发送模板兼容,在 RocketMQTemplate 集成了 AbstractMessageSendingTemplate 抽象类,来支持相关的消息转换和发送方法,这些方法最终会代理给 doSend() 方法、doSend() 以及 RocoketMQ 所特有的一些方法如异步,单向和顺序等方法直接添加到 RoketMQTempalte 中,这些方法直接代理调用到 RocketMQ 的 Producer API 来进行消息的发送。
对于事务消息的处理,在消息发送端进行了部分的扩展,参考上面的调用关系类图。
RocketMQTemplate 里加入了一个发送事务消息的方法 sendMessageInTransaction(),而且最终这个方法会代理到 RocketMQ 的 TransactionProducer 进行调用,在这个 Producer 上会注册其关联的 TransactionListener 实现类,以便在发送消息后可以对 TransactionListener 里的方法实现进行调用。
在消费端 Spring-Boot 应用启动后,会扫描全部包含 @RocketMQMessageListener 注解的类(这些类须要集成 RocketMQListener 接口,并实现 onMessage()方法),这个 Listener 会一对一的被放置到。
DefaultRocketMQListenerContainer 容器对象中,容器对象会根据消费的方式(并发或顺序),将 RocketMQListener 封装到具体的 RocketMQ 内部的并发或者顺序接口实现。在容器中建立 RocketMQ Consumer 对象,启动并监听定制的 Topic 消息,若是有消费消息,则回调到 Listener 的 onMessage() 方法。
上面的一章介绍了 RocketMQ 在 spring-boot-starter 方式的实现,这里经过一个最简单的消息发送和消费的例子来介绍如何使这个 rocketmq-spring-boot-starter。
要验证 RocketMQ 的 Spring-Boot 客户端,首先要确保 RocketMQ 服务正确的下载并启动。能够参考 RocketMQ 主站的快速开始来进行操做。确保启动 NameServer 和 Broker 已经正确启动。
在执行启动命令的目录下执行下面的命令行操做:
bash bin/mqadmin updateTopic -c DefaultCluster -t string-topic
目前的 spring-boot-starter 依赖尚未提交的 Maven 的中心库,用户使用前须要自行下载 git 源码,而后执行 mvn clean install 安装到本地仓库。
git clone https://github.com/apache/rocketmq-externals.git cd rocketmq-spring-boot-starter mvn clean install
用户若是使用它,须要在消息的发布和消费客户端的 maven 配置文件 pom.xml 中添加以下的依赖:
属性 spring-boot-starter-rocketmq-version 的取值为:1.0.0-SNAPSHOT, 这与上一步骤中执行安装到本地仓库的版本一致。
发送端的配置文件 application.properties:
发送端的 Java 代码:
消费端的配置文件 application.properties:
消费端的 Java 代码:
这里只是简单的介绍了使用 spring-boot 来编写最基本的消息发送和接收的代码,若是须要了解更多的调用方式,如: 异步发送,对象消息体,指定 tag 标签以及指定事务消息,请参看 github 的说明文档和详细的代码。咱们后续还会对这些高级功能进行陆续的介绍。
辽天,阿里巴巴技术专家,Apache RocketMQ 内核控,拥有多年分布式系统研发经验,对 Microservice、Messaging 和 Storage 等领域有深入理解, 目前专一 RocketMQ 内核优化以及 Messaging 生态建设。