MQ消息中间件⼴泛应⽤在应⽤解耦合、异步消息处理、流量削峰等场景中。java
不一样的MQ消息中间件内部机制包括使⽤⽅式都会有所不一样,⽐如RabbitMQ中有Exchange(交换机/交换器)这⼀概念, kafka有Topic、 Partition分区这些概念, MQ消息中间件的差别性不利于咱们上层的开发应⽤,当咱们的系统但愿从原有的RabbitMQ切换到Kafka时,咱们会发现⽐较困难,不少要操做可能重来(由于应⽤程序和具体的某⼀款MQ消息中间件耦合在⼀起了)。git
Spring Cloud Stream进⾏了很好的上层抽象,可让咱们与具体消息中间件解耦合,屏蔽掉了底层具体MQ消息中间件的细节差别,就像Hibernate屏蔽掉了具体数据库(Mysql/Oracle⼀样)。如此⼀来,咱们学习、开发、维护MQ都会变得轻松。⽬前Spring Cloud Stream⽀持RabbitMQ和Kafka。spring
本质: 屏蔽掉了底层不一样MQ消息中间件之间的差别,统⼀了MQ的编程模型,下降了学习、开发、维护MQ的成本sql
Spring Cloud Stream 是⼀个构建消息驱动微服务的框架。应⽤程序经过inputs(至关于消息消费者consumer)或者outputs(至关于消息⽣产者producer)来与Spring Cloud Stream中的binder对象交互,⽽Binder对象是⽤来屏蔽底层MQ细节的,它负责与具体的消息中间件交互。数据库
说⽩了:对于咱们来讲,只须要知道如何使⽤Spring Cloud Stream与Binder对象交互便可编程
注解 | 描述 |
---|---|
@Input(在消费者⼯程中使⽤) | 注解标识输⼊通道,经过该输⼊通道接收到的消息进⼊应⽤程序 |
@Output(在⽣产者⼯程中使⽤) | 注解标识输出通道,发布的消息将经过该通道离开应⽤程序 |
@StreamListener(在消费者⼯程中使⽤,监听message的到来) | 监听队列,⽤于消费者的队列的消息的接收(有消息监听.....) |
@EnableBinding | 把Channel和Exchange(对于RabbitMQ)绑定在⼀起 |
在父工程下新建子模块lagou-cloud-stream-producer-9090,引入jar:json
<!--spring cloud stream 依赖(rabbit) --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency>
添加rabbit相关配置:app
spring: application: name: lagou-cloud-stream-producer cloud: stream: binders: # 绑定MQ服务信息(此处咱们是RabbitMQ) lagouRabbitBinder: # 给Binder定义的名称,⽤于后⾯的关联 type: rabbit # MQ类型,若是是Kafka的话,此处配置kafka environment: # MQ环境配置(⽤户名、密码等) spring: rabbitmq: host: localhost port: 5672 username: guest password: guest bindings: # 关联整合通道和binder对象 output: # output是咱们定义的通道名称,此处不能乱改 destination: lagouExchange # 要使⽤的Exchange名称(消息队列主题名称) content-type: text/plain # application/json # 消息类型设置,⽐如json binder: lagouRabbitBinder # 关联MQ服务
定义一个发送消息的接口及实现类:框架
public interface IMessageProduder { void sendMessage(String message); }
//绑定的通道,output。springcloud stream内对输出通道的定义 @EnableBinding(Source.class) public class MessageProducerImpl implements IMessageProduder { @Autowired private Source source; @Override public void sendMessage(String message) { //向mq发送消息(并不直接操做mq,而是操做springcloud stream //使用source指定的output通道向外发送消息 source.output().send(MessageBuilder.withPayload(message).build()); } }
父工程下新建消费者子模块lagou-cloud-stream-consumer-9091,引入jar坐标和服务端同样,这里再也不赘述。
application.yml里面配置与rabbit相关参数,惟一与服务者端不一样的是input和output参数:
其余都保持一致。
在消费端定义service类来接受消息:异步
@EnableBinding(Sink.class) public class MessageConsumerService { @StreamListener(Sink.INPUT) public void receiveMessage(Message<String> message){ System.out.println("----接受到的消息---->"+message); } }
咱们在服务提供者端写一个测试类来发送消息:
@SpringBootTest(classes = {StreamProducerApplication9090.class}) @RunWith(SpringJUnit4ClassRunner.class) public class MessageProducerTest { @Autowired private IMessageProduder iMessageProduder; @Test public void testSendMessage(){ iMessageProduder.sendMessage("hello world----!!"); } }
咱们先启动服务消费者,而后运行服务提供者端的测试类,看服务消费者端的控制台输出了接收到的信息:
咱们将服务消费者复制一份,新消费者的端口是9092,前一个消费者端口是9091。
这样咱们继续测试,会发现同一个服务提供者发送的消息,被两个消费者都接收到并进行处理了。
这明显是有问题的,好比电商网站的订单,确定只须要处理一次就行。
为了解决这个问题,rabbitmq有个消息分组的概念,只要两个消费者实例处在一个组里,那么这个组里只有一个消费者会处理这个消息。
咱们仅仅须要在服务消费者端设置 spring.cloud.stream.bindings.input.group 属性,多个消费者实例配置为同⼀个group名称(在同⼀个group中的多个消费者只有⼀个能够获取到消息并消费)。以下:
扩展:前面咱们都是先启动服务消费者,而后再启动服务提供者发送消息,也就是消息是临时性的,并无持久化存储。当咱们设置了分组以后,消息就会持久化存储。咱们先发送消息,而后再启动服务消费者客户端,也可以接收到消息。
欢迎访问个人博客:https://www.liuyj.top