springcloud学习(七)之Stream

前言

MQ消息中间件⼴泛应⽤在应⽤解耦合、异步消息处理、流量削峰等场景中。java

不一样的MQ消息中间件内部机制包括使⽤⽅式都会有所不一样,⽐如RabbitMQ中有Exchange(交换机/交换器)这⼀概念, kafka有Topic、 Partition分区这些概念, MQ消息中间件的差别性不利于咱们上层的开发应⽤,当咱们的系统但愿从原有的RabbitMQ切换到Kafka时,咱们会发现⽐较困难,不少要操做可能重来(由于应⽤程序和具体的某⼀款MQ消息中间件耦合在⼀起了)。git

Spring Cloud Stream进⾏了很好的上层抽象,可让咱们与具体消息中间件解耦合,屏蔽掉了底层具体MQ消息中间件的细节差别,就像Hibernate屏蔽掉了具体数据库(Mysql/Oracle⼀样)。如此⼀来,咱们学习、开发、维护MQ都会变得轻松。⽬前Spring Cloud Stream⽀持RabbitMQ和Kafka。spring

本质: 屏蔽掉了底层不一样MQ消息中间件之间的差别,统⼀了MQ的编程模型,下降了学习、开发、维护MQ的成本sql

Stream简介

Spring Cloud Stream 是⼀个构建消息驱动微服务的框架。应⽤程序经过inputs(至关于消息消费者consumer)或者outputs(至关于消息⽣产者producer)来与Spring Cloud Stream中的binder对象交互,⽽Binder对象是⽤来屏蔽底层MQ细节的,它负责与具体的消息中间件交互。数据库

说⽩了:对于咱们来讲,只须要知道如何使⽤Spring Cloud Stream与Binder对象交互便可编程

Stream的几个经常使用注解

注解 描述
@Input(在消费者⼯程中使⽤) 注解标识输⼊通道,经过该输⼊通道接收到的消息进⼊应⽤程序
@Output(在⽣产者⼯程中使⽤) 注解标识输出通道,发布的消息将经过该通道离开应⽤程序
@StreamListener(在消费者⼯程中使⽤,监听message的到来) 监听队列,⽤于消费者的队列的消息的接收(有消息监听.....)
@EnableBinding 把Channel和Exchange(对于RabbitMQ)绑定在⼀起

Stream开发实战

生产者端

在父工程下新建子模块lagou-cloud-stream-producer-9090,引入jar:json

<!--spring cloud stream 依赖(rabbit) -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>

添加rabbit相关配置:app

spring:
  application:
    name: lagou-cloud-stream-producer
  cloud:
    stream:
      binders: # 绑定MQ服务信息(此处咱们是RabbitMQ)
        lagouRabbitBinder: # 给Binder定义的名称,⽤于后⾯的关联
          type: rabbit # MQ类型,若是是Kafka的话,此处配置kafka
          environment: # MQ环境配置(⽤户名、密码等)
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # 关联整合通道和binder对象
        output: # output是咱们定义的通道名称,此处不能乱改
          destination: lagouExchange # 要使⽤的Exchange名称(消息队列主题名称)
          content-type: text/plain # application/json # 消息类型设置,⽐如json
          binder: lagouRabbitBinder # 关联MQ服务

定义一个发送消息的接口及实现类:框架

public interface IMessageProduder {
    void sendMessage(String message);
}
//绑定的通道,output。springcloud stream内对输出通道的定义
@EnableBinding(Source.class)
public class MessageProducerImpl implements IMessageProduder {

    @Autowired
    private Source source;

    @Override
    public void sendMessage(String message) {
        //向mq发送消息(并不直接操做mq,而是操做springcloud stream
        //使用source指定的output通道向外发送消息
        source.output().send(MessageBuilder.withPayload(message).build());
    }
}

消费者端

父工程下新建消费者子模块lagou-cloud-stream-consumer-9091,引入jar坐标和服务端同样,这里再也不赘述。
application.yml里面配置与rabbit相关参数,惟一与服务者端不一样的是input和output参数:

其余都保持一致。
在消费端定义service类来接受消息:异步

@EnableBinding(Sink.class)
public class MessageConsumerService {

    @StreamListener(Sink.INPUT)
    public void receiveMessage(Message<String> message){
        System.out.println("----接受到的消息---->"+message);
    }
}

测试

咱们在服务提供者端写一个测试类来发送消息:

@SpringBootTest(classes = {StreamProducerApplication9090.class})
@RunWith(SpringJUnit4ClassRunner.class)
public class MessageProducerTest
{
    @Autowired
    private IMessageProduder iMessageProduder;

    @Test
    public void testSendMessage(){
        iMessageProduder.sendMessage("hello world----!!");
    }

}

咱们先启动服务消费者,而后运行服务提供者端的测试类,看服务消费者端的控制台输出了接收到的信息:

Stream之消息分组

咱们将服务消费者复制一份,新消费者的端口是9092,前一个消费者端口是9091。
这样咱们继续测试,会发现同一个服务提供者发送的消息,被两个消费者都接收到并进行处理了。
这明显是有问题的,好比电商网站的订单,确定只须要处理一次就行。

为了解决这个问题,rabbitmq有个消息分组的概念,只要两个消费者实例处在一个组里,那么这个组里只有一个消费者会处理这个消息。

咱们仅仅须要在服务消费者端设置 spring.cloud.stream.bindings.input.group 属性,多个消费者实例配置为同⼀个group名称(在同⼀个group中的多个消费者只有⼀个能够获取到消息并消费)。以下:

扩展:前面咱们都是先启动服务消费者,而后再启动服务提供者发送消息,也就是消息是临时性的,并无持久化存储。当咱们设置了分组以后,消息就会持久化存储。咱们先发送消息,而后再启动服务消费者客户端,也可以接收到消息。

案例源码

stream案例的源码地址

欢迎访问个人博客:https://www.liuyj.top

相关文章
相关标签/搜索