Spring Cloud Stream是一个用于构建消息驱动的微服务应用的框架,其提供的一系列抽象屏蔽了不一样类型消息中间件使用上的差别,同时也大大简化了Spring在整合消息中间件时的使用复杂度。html
Spring Cloud Stream 提供了Binder(负责与消息中间件进行交互)java
# 其余参数默认配置 spring.rabbitmq.host=你的host
// 该注解表示绑定Sink消息通道 @EnableBinding(Sink.class) public class MsgReceiver { private static final Logger logger = LoggerFactory.getLogger(MsgReceiver.class); // 自带 消费者 @StreamListener(Sink.INPUT) public void receive(Object payload){ logger.info("received: " + payload); } }
public interface MyChannel { String INPUT = "test-input"; String OUTPUT = "test-output"; // 收 @Input(INPUT) SubscribableChannel input(); // 发 @Output(OUTPUT) MessageChannel output(); }
// 绑定自定义消息通道 @EnableBinding(MyChannel.class) public class MsgReceiver1 { private static final Logger logger = LoggerFactory.getLogger(MsgReceiver1.class); // 收 @StreamListener(MyChannel.INPUT) public void receive(Object payload){ logger.info("received1: " + payload + ":" + new Date()); } }
package com.sundown.stream.controller; import com.sundown.stream.bean.ChatMessage; import com.sundown.stream.msg.MyChannel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.integration.support.MessageBuilder; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.GenericMessage; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.Random; @RestController public class HelloController { @Autowired MyChannel myChannel; @GetMapping("/hello") public void hello(){ String message = "welcome spring cloud stream"; myChannel.output().send(MessageBuilder.withPayload(message).build()); } }
spring.cloud.stream.bindings.test-input.destination=test-topic spring.cloud.stream.bindings.test-output.destination=test-topic
java -jar stream-0.0.1-SNAPSHOT.jar
和java -jar stream-0.0.1-SNAPSHOT.jar --server.port=8081
运行访问http://localhost:8080/hello
spring.cloud.stream.bindings.test-input.destination=test-topic spring.cloud.stream.bindings.test-output.destination=test-topic spring.cloud.stream.bindings.test-input.group=gg spring.cloud.stream.bindings.test-output.group=gg
spring.cloud.stream.bindings.test-input.destination=test-topic spring.cloud.stream.bindings.test-output.destination=test-topic spring.cloud.stream.bindings.test-input.group=gg spring.cloud.stream.bindings.test-output.group=gg # 开启消费分区(消费者上配置) spring.cloud.stream.bindings.test-input.consumer.partitioned=true # 消费者实例个数(消费者上配置) spring.cloud.stream.instance-count=2 # 当前实例下标(消费者上配置) spring.cloud.stream.instance-index=0
@RestController public class HelloController { @Autowired MyChannel myChannel; @GetMapping("/hello") public void hello(){ String message = "welcome spring cloud stream"; // 先写死 int whichPart = 1; System.out.println("发送消息:" + message + ",发往分区:" + whichPart); myChannel.output().send(MessageBuilder.withPayload(message).setHeader("whichPart", whichPart).build()); } }
java -jar stream-0.0.1-SNAPSHOT.jar --spring.cloud.stream.instance-index=0
java -jar stream-0.0.1-SNAPSHOT.jar --server.port=8081 --spring.cloud.stream.instance-index=0
(别忘了先关闭启动类 否则打包会报错)@GetMapping("/hello") public void hello(){ String message = "welcome spring cloud stream"; int whichPart = new Random().nextInt(2); System.out.println("发送消息:" + message + ",发往分区:" + whichPart); myChannel.output().send(MessageBuilder.withPayload(message).setHeader("whichPart", whichPart).build()); }
虽然定时任务能够用cron表达式 可是对于一些特殊的定时任务 可使用stream+rabbitmq更合适 好比几分钟后执行
rabbitmq插件安装web
spring.rabbitmq.host=xxx spring.cloud.stream.bindings.test-input.destination=topic spring.cloud.stream.bindings.test-output.destination=topic spring.cloud.stream.rabbit.bindings.test-input.consumer.delayed-exchange=true spring.cloud.stream.rabbit.bindings.test-output.producer.delayed-exchange=true #spring.cloud.stream.bindings.test-input.destination=test-topic #spring.cloud.stream.bindings.test-output.destination=test-topic # #spring.cloud.stream.bindings.test-input.group=gg #spring.cloud.stream.bindings.test-output.group=gg # ## 开启消费分区(消费者上配置) #spring.cloud.stream.bindings.test-input.consumer.partitioned=true ## 消费者实例个数(消费者上配置) #spring.cloud.stream.instance-count=2 ## 当前实例下标(消费者上配置) #spring.cloud.stream.instance-index=0 # ## 生产者配置 #spring.cloud.stream.bindings.test-output.producer.partition-key-expression=headers['whichPart'] ## 消费节点数量 #spring.cloud.stream.bindings.test-output.producer.partition-count=2
// 绑定自定义消息通道 @EnableBinding(MyChannel.class) public class MsgReceiver1 { private static final Logger logger = LoggerFactory.getLogger(MsgReceiver1.class); // 收 @StreamListener(MyChannel.INPUT) public void receive(Object payload){ // 添加日期 一会好对比 logger.info("received1: " + payload + ":" + new Date()); } }
@RestController public class HelloController { private static final Logger logger = LoggerFactory.getLogger(HelloController.class); @Autowired MyChannel myChannel; @GetMapping("/delay") public void delay(){ String message = "welcome spring cloud stream"; logger.info("send msg:" + new Date()); // x-delay --> 延迟3s myChannel.output().send(MessageBuilder.withPayload(message).setHeader("x-delay", 3000).build()); } }
stream自带的与自定义(添加destination=xxx)之间的相似和区别
解决重复消费 分组(group)
消息分组单个实例访问(开启消费分区 实例个数 实例下标 生产者配置 消费节点数)
定时器 rabbitmq相关的插件安装运行 后端代码实现(配置delayed-exchange和destination以及controller 发送时添加setHeader("x-delay", 3000) 3s延时)spring