前言:咱们如今有一个用微服务架构模式开发的系统,系统里有一个商品服务和订单服务,且它们都是同步通讯的。java
目前咱们商品服务和订单服务之间的通讯方式是同步的,当业务扩大以后,若是还继续使用同步的方式进行服务之间的通讯,会使得服务之间的耦合增大。例如咱们登陆操做可能须要同步调用用户服务、积分服务、短信服务等等,而服务之间可能又依赖别的服务,那么这样一个登陆过程就会耗费很多的时间,以至用户的体验下降。web
那咱们在微服务架构下要如何对服务之间的通讯进行解耦呢?这就须要使用到消息中间件了,消息中间件能够帮助咱们将同步的通讯转化为异步通讯,服务之间只须要对消息队列进行消息的发布、订阅便可,从而解耦服务之间的通讯依赖。spring
目前较为主流的消息中间件:数据库
异步通讯特色:json
异步的常见形态:设计模式
MQ应用场景:bash
更多关于消息中间件的描述,能够参考我另外一篇文章:架构
在上文 Spring Cloud Config - 统一配置中心 中,已经演示过使用Docker安装RabbitMQ,因此这里就再也不浪费篇幅演示了。app
直接进入正题,咱们以订单服务和商品服务示例,首先在订单服务的项目中,加入mq的依赖:框架
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
在配置文件中增长RabbitMQ的相关配置项:
到订单服务的项目中,新建一个message包,在该包中建立一个MqReceiver类,咱们来看看RabbitMQ的基本操做。代码以下:
package org.zero.springcloud.order.server.message; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @program: sell_order * @description: 接收消息,即消费者 * @author: 01 * @create: 2018-08-21 22:24 **/ @Slf4j @Component public class MqReceiver { /** * 接收消息并打印 * * @param message message */ @RabbitListener(queues = "myQueue") public void process(String message) { // @RabbitListener注解用于监听RabbitMQ,queues指定监听哪一个队列 log.info(message); } }
由于RabbitMQ上尚未myQueue这个队列,因此咱们还获得RabbitMQ的管理界面上,建立这个队列,以下:
而后新建一个测试类,用于发送消息到队列中,代码以下:
package org.zero.springcloud.order.server; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; /** * @program: sell_order * @description: 发送消息,即消息发布者 * @author: 01 * @create: 2018-08-21 22:28 **/ @RunWith(SpringRunner.class) @SpringBootTest public class MqSenderTest { @Autowired private AmqpTemplate amqpTemplate; @Test public void send() { for (int i = 0; i < 100; i++) { amqpTemplate.convertAndSend("myQueue", "第" + i + "条消息"); } } }
运行该测试类,运行成功后到OrderApplication的控制台上,看看是否接收并打印了接收到的消息。正常状况应以下:
基本的消费者和发布者的代码咱们都已经编写过,而且也测试成功了。但有个小问题,咱们要监听一个不存在的队列时,须要手动去新建这个队列,感受每次都手动新建挺麻烦的。有没有办法当队列不存在时,自动建立该队列呢?答案是有的,依旧使用以前的那个注解,只不过此次的参数要换成queuesToDeclare
。示例代码以下:
package org.zero.springcloud.order.server.message; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @program: sell_order * @description: 接收消息,即消费者 * @author: 01 * @create: 2018-08-21 22:24 **/ @Slf4j @Component public class MqReceiver { /** * 接收并打印消息 * 能够当队列不存在时自动建立队列 * * @param message message */ @RabbitListener(queuesToDeclare = @Queue("myQueue")) public void process2(String message) { // @RabbitListener注解用于监听RabbitMQ,queuesToDeclare能够建立指定的队列 log.info(message); } }
以上咱们经过示例简单的介绍了消息的收发及队列的建立,本小节则介绍一下exchange 的自动绑定方式。当须要自动绑定 exchange 时,咱们也能够经过 bindings 参数完成。示例代码以下:
package org.zero.springcloud.order.server.message; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @program: sell_order * @description: 接收消息,即消费者 * @author: 01 * @create: 2018-08-21 22:24 **/ @Slf4j @Component public class MqReceiver { /** * 接收并打印消息 * 能够当队列不存在时自动建立队列,以及自动绑定指定的Exchange * @param message message */ @RabbitListener(bindings = @QueueBinding( value = @Queue("myQueue"), exchange = @Exchange("myExchange") )) public void process3(String message) { // @RabbitListener注解用于监听RabbitMQ,bindings能够建立指定的队列及自动绑定Exchange log.info(message); } }
消息分组咱们也是能够经过 bindings 参数完成,例如如今有一个数码供应商服务和一个水果供应商服务,它们都监听着同一个订单服务的消息队列。但我但愿数码订单的消息被数码供应商服务消费,而水果订单的消息被水果供应商服务消费。因此咱们就须要用到消息分组。示例代码以下:
package org.zero.springcloud.order.server.message; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.Exchange; import org.springframework.amqp.rabbit.annotation.Queue; import org.springframework.amqp.rabbit.annotation.QueueBinding; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; /** * @program: sell_order * @description: 接收消息,即消费者 * @author: 01 * @create: 2018-08-21 22:24 **/ @Slf4j @Component public class MqReceiver { /** * 数码供应商服务 - 接收消息 * * @param message message */ @RabbitListener(bindings = @QueueBinding( value = @Queue("computerOrder"), exchange = @Exchange("myOrder"), key = "computer" // 指定路由的key )) public void processComputer(String message) { log.info("computer message : {}", message); } /** * 水果供应商服务 - 接收消息 * * @param message message */ @RabbitListener(bindings = @QueueBinding( value = @Queue("computerOrder"), exchange = @Exchange("myOrder"), key = "fruit" // 指定路由的key )) public void processFruit(String message) { log.info("fruit message : {}", message); } }
测试代码以下,经过指定key进行消息的分组,将消息发送到数码供应商服务:
package org.zero.springcloud.order.server; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; /** * @program: sell_order * @description: 发送消息,即消息发布者 * @author: 01 * @create: 2018-08-21 22:28 **/ @RunWith(SpringRunner.class) @SpringBootTest public class MqSenderTest { @Autowired private AmqpTemplate amqpTemplate; @Test public void sendOrder() { for (int i = 0; i < 100; i++) { // 第一个参数指定队列,第二个参数来指定路由的key,第三个参数指定消息 amqpTemplate.convertAndSend("myOrder", "computer", "第" + i + "条消息"); } } }
重启项目后,运行以上测试代码,控制台输出以下,能够看到只有数码供应商服务才可以接收到消息,而水果供应商服务是接收不到的。这就完成了消息分组:
Spring Cloud Stream 是一个用来为微服务应用构建消息驱动能力的框架。它能够基于Spring Boot 来建立独立的,可用于生产的Spring 应用程序。他经过使用Spring Integration来链接消息代理中间件以实现消息事件驱动。Spring Cloud Stream 为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。目前仅支持RabbitMQ、Kafka。
什么是Spring Integration ? Integration 集成
企业应用集成(EAI)是集成应用之间数据和服务的一种应用技术。四种集成风格:
Spring Integration做为一种企业级集成框架,听从现代经典书籍《企业集成模式》,为开发者提供了一种便捷的实现模式。Spring Integration构建在Spring控制反转设计模式之上,抽象了消息源和目标,利用消息传送和消息操做来集成应用环境下的各类组件。消息和集成关注点都被框架处理,因此业务组件能更好地与基础设施隔离,从而下降开发者所要面对的复杂的集成职责。
模型图:
如今咱们来看看Spring Cloud Stream的基本使用,到订单服务项目上,增长以下依赖:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
而后是在配置文件中,配置rabbitmq的相关信息,只不过咱们以前已经配置过了因此不用配置了。
咱们来看看如何使用Spring Cloud Stream发送和接收消息,首先建立一个接口,定义input和output方法。代码以下:
package org.zero.springcloud.order.server.message; import org.springframework.cloud.stream.annotation.Input; import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; import org.springframework.messaging.SubscribableChannel; public interface StreamClient { // 接收消息、入口 @Input("myMessageInput") SubscribableChannel input(); // 发送消息、 @Output("myMessageOutput") MessageChannel output(); }
建立一个消息接收者。代码以下:
package org.zero.springcloud.order.server.message; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Component; /** * @program: sell_order * @description: 消息接收者 * @author: 01 * @create: 2018-08-22 22:16 **/ @Slf4j @Component @EnableBinding(StreamClient.class) public class StreamReceiver { @StreamListener("myMessageOutput") public void process(String message) { log.info("message : {}", message); } }
消息发送者,这里做为一个Controller存在。代码以下:
package org.zero.springcloud.order.server.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import org.zero.springcloud.order.server.message.StreamClient; /** * @program: sell_order * @description: 消息发送者 * @author: 01 * @create: 2018-08-22 22:18 **/ @RestController public class SendMessageController { private final StreamClient streamClient; @Autowired public SendMessageController(StreamClient streamClient) { this.streamClient = streamClient; } @GetMapping("/send/msg") public void send() { for (int i = 0; i < 100; i++) { MessageBuilder<String> messageBuilder = MessageBuilder.withPayload("这是第" + i + "条消息"); streamClient.output().send(messageBuilder.build()); } } }
由于咱们的微服务可能会部署多个实例,如有多个实例须要对消息进行分组,不然全部的服务实例都会接收到相同的消息。在配置文件中,增长以下配置完成消息的分组:
spring: ... cloud: ... stream: bindings: myMessageOutput: group: order ...
重启项目,访问http://localhost:9080/send/msg
,控制台输出以下:
注:Spring Cloud Stream能够在项目启动的时候自动建立队列,在项目关闭的时候自动删除队列
在实际的开发中,咱们通常发送的消息一般会是一个java对象而不是字符串。因此咱们来看看如何发送对象,其实和发送字符串几乎是同样的。消息发送者代码以下:
package org.zero.springcloud.order.server.controller; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import org.zero.springcloud.order.server.dto.OrderDTO; import org.zero.springcloud.order.server.message.StreamClient; /** * @program: sell_order * @description: 消息发送者 * @author: 01 * @create: 2018-08-22 22:18 **/ @RestController public class SendMessageController { private final StreamClient streamClient; @Autowired public SendMessageController(StreamClient streamClient) { this.streamClient = streamClient; } /** * 发送OrderDTO对象 */ @GetMapping("/send/msg") public void send() { OrderDTO orderDTO = new OrderDTO(); orderDTO.setOrderId("123465"); MessageBuilder<OrderDTO> messageBuilder = MessageBuilder.withPayload(orderDTO); streamClient.output().send(messageBuilder.build()); } }
消息接收者也只须要在方法参数上声明这个对象的类型便可。代码以下:
package org.zero.springcloud.order.server.message; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.stereotype.Component; import org.zero.springcloud.order.server.dto.OrderDTO; /** * @program: sell_order * @description: 消息接收者 * @author: 01 * @create: 2018-08-22 22:16 **/ @Slf4j @Component @EnableBinding(StreamClient.class) public class StreamReceiver { /** * 接收OrderDTO对象 * @param message message */ @StreamListener("myMessageOutput") public void process(OrderDTO message) { log.info("message : {}", message); } }
另外须要提到的一点是,默认状况下,java对象在消息队列中是以base64编码存在的,咱们也都知道base64不可读。为了方便查看堆积在消息队列里的对象数据,咱们但愿java对象是以json格式的字符串呈现,这样就方便咱们人类阅读。至于这个问题,咱们只须要在配置文件中,增长一段content-type的配置便可。以下:
spring: ... cloud: ... stream: bindings: myMessageOutput: group: order content-type: application/json ...
重启项目,访问http://localhost:9080/send/msg
,控制台输出以下:
2018-08-22 23:32:33.704 INFO 12436 --- [nio-9080-exec-4] o.z.s.o.server.message.StreamReceiver : message : OrderDTO(orderId=123465, buyerName=null, buyerPhone=null, buyerAddress=null, buyerOpenid=null, orderAmount=null, orderStatus=null, payStatus=null, createTime=null, updateTime=null, orderDetailList=null)
当咱们接收到消息的时候,可能会须要返回一段特定的消息,表示消息已收到之类的。至于这个功能,咱们经过@SendTo
注解便可完成。代码以下:
package org.zero.springcloud.order.server.message; import lombok.extern.slf4j.Slf4j; import org.springframework.cloud.stream.annotation.EnableBinding; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.messaging.handler.annotation.SendTo; import org.springframework.stereotype.Component; import org.zero.springcloud.order.server.dto.OrderDTO; /** * @program: sell_order * @description: 消息接收者 * @author: 01 * @create: 2018-08-22 22:16 **/ @Slf4j @Component @EnableBinding(StreamClient.class) public class StreamReceiver { /** * 接收OrderDTO对象 * @param message message */ @StreamListener("myMessageOutput") @SendTo("myMessageInput") public String process(OrderDTO message) { log.info("message : {}", message); return "success"; } @StreamListener("myMessageInput") public void success(String message) { log.info("message : {}", message); } }
重启项目,访问http://localhost:9080/send/msg
,控制台输出以下:
Spring Cloud Stream 再一次简化了咱们在分布式环境下对消息中间件的操做,配置好消息中间件的链接地址及用户密码后,在开发的过程当中,咱们只须要关注input和output,对消息中间件的操做基本是无感知的。