SpringCloud-Stream 消息驱动

1、概述

是什么?

Spring Cloud Stream 是一个构建消息微服务驱动的框架。能够屏蔽底层消息中间件的差别,下降版本切换成本,统一消息的编程模型,目前仅支持 RabbitMQ 和 Kafka。java

设计思想

标准 MQ 的设计思想

生产者 / 消费者之间靠消息媒介传递信息内容,Messagespring

消息必须走特定的通道,MessageChannel编程

消息通道里的消息如何被消费呢,谁负责收发处理?消息通道MessageChannel的子接口SubscribableChannel,由消息处理器MessageHandler所订阅json

Spring Cloud Stream 的设计思想

若是咱们的项目中用到了 RabbitMQ 和 Kafka 两种消息中间件,因为它们的架构不一样,对实际开发形成了必定困扰;或者用到了一种消息中间件,随着后面的业务需求须要向另外一种消息队列迁移,这无疑是灾难性的,会形成一大堆的改动,由于它们与系统耦合了,这时候 Spring Cloud Stream 就能够为咱们提供一种解耦的方式。api

Spring Cloud Stream 提供的解决方案是:经过定义绑定器 Binder 做为中间层,实现了应用程序与消息中间件细节之间的隔离。向应用程序暴露统一的 Channel 通道,使得应用程序不须要再考虑各类消息中间件的实现。架构

inputs 对应消费者,outputs 对应生产者app

Stream中的消息通讯方式遵循了发布-订阅模式,用 Topic 主题进行广播(在RabbitMQ就是Exchange,在Kafka中就是Topic)框架

工做流程

Binder:绑定器,很方便的链接中间件,屏蔽差别ide

Channel:通道,是队列 Queue 的一种抽象,在消息通信系统中就是实现存储与转发的媒介,经过 Channel 对队列进行配置微服务

Source 和 Sink:简单理解就是参照物是 Spring Cloud Stream 自己,从 Stream 发布消息就是输出,接收消息就是输入

编码 API 和经常使用注解

2、基本使用

生产者

配置:

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: # 配置绑定的rabbitmq的服务信息
        defaultRabbit: # 表示定义的名称,用于与binding整合
          type: rabbit # 消息组件的类型
          environment:
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 服务的整合处理
        output: # 这个名字是一个通道的名字
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,若是文本就是“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置

发送消息:

@EnableBinding(Source.class)
@Slf4j
public class MessageProviderImpl implements IMessageProvider {

    @Autowired
    private MessageChannel output;

    @Override
    public void send() {
        String serial = IdUtil.simpleUUID();
        output.send(MessageBuilder.withPayload(serial).build());
        log.info("流水号:" + serial);
    }
}

消费者

配置与生产者一致,只须要把 output 改成 input

接收消息:

@Controller
@EnableBinding(Sink.class)
@Slf4j
public class MessageReceiveController {
    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void receiveMessage(Message<String> message){
      log.info("receive -> " + serverPort + " -> " +message.getPayload());
    }
}

3、解决消息重复消费的问题

场景

举个栗子,咱们对订单系统作了集群部署,消费者从 RabbitMQ 中获取订单信息,若是同一个订单被不一样的服务都获取到了,就会形成数据错误,为了不这种状况,咱们可使用 Stream 中的消息分组来解决。

原理

在 Stream 中,处于同一个组的多个消费者是竞争关系,就能够保证消息只被一个服务消费一次,而不一样组是能够重复消费的。如今默认分组就是不一样的,组流水号不同。

解决

将不想产生重复消费的服务分为同一个组便可

配置方式

spring:
  cloud:
    stream:
      bindings:
        input:
          group: groupA

4、持久化

若是咱们的消费者由于种种缘由宕机了,生产者此时发送了消息,没有配置 group 属性的消费者从新上线后没法接收到以前的消息,而配置了 group 的消费者仍会接收到消息,这就是持久化的效果

相关文章
相关标签/搜索