SpringCloud Stream消息驱动能够简化开发人员对消息中间件的使用复杂度,让系统开发人员更多尽力专一与核心业务逻辑的开发。SpringCloud Stream基于SpringBoot实现,自动配置化的功能能够帮助咱们快速上手学习,相似与咱们以前学习的orm框架,能够平滑的切换多种不一样的数据库。node
目前SpringCloud Stream 目前只支持 rabbitMQ和kafkaweb
经过定义绑定器做为中间层,实现了应用程序与消息中间件细节之间的隔离。经过向应用程序暴露统一的Channel经过,是的应用程序不须要再考虑各类不一样的消息中间件的实现。当须要升级消息中间件,或者是更换其余消息中间件产品时,咱们须要作的就是更换对应的Binder绑定器而不须要修改任何应用逻辑 。spring
在该模型图上有以下几个核心概念:数据库
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.1.RELEASE</version> </parent> <dependencies> <!-- SpringBoot整合Web组件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> <version>2.0.1.RELEASE</version> </dependency> </dependencies>
server: port: 9000 spring: application: name: spingcloud-stream-producer # rabbitmq: # host: 192.168.174.128 # port: 5672 # username: guest # password: guest
建立管道app
// 建立管道接口 public interface SendMessageInterface { // 建立一个输出管道,用于发送消息 @Output("my_msg") SubscribableChannel sendMsg(); }
@RestController public class SendMsgController { @Autowired private SendMessageInterface sendMessageInterface; @RequestMapping("/sendMsg") public String sendMsg() { String msg = UUID.randomUUID().toString(); System.out.println("生产者发送内容msg:" + msg); Message build = MessageBuilder.withPayload(msg.getBytes()).build(); sendMessageInterface.sendMsg().send(build); return "success"; } }
@SpringBootApplication @EnableBinding(SendMessageInterface.class) // 开启绑定 public class AppProducer { public static void main(String[] args) { SpringApplication.run(AppProducer.class, args); } }
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.1.RELEASE</version> </parent> <dependencies> <!-- SpringBoot整合Web组件 --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> <version>2.0.1.RELEASE</version> </dependency> </dependencies>
server: port: 9000 spring: application: name: spingcloud-stream-consumer # rabbitmq: # host: 192.168.174.128 # port: 5672 # username: guest # password: guest
public interface RedMsgInterface { // 从管道中获取消息 @Input("my_msg") SubscribableChannel redMsg(); }
@Component public class Consumer { @StreamListener("my_msg") public void listener(String msg) { System.out.println("消费者获取生产消息:" + msg); } }
@SpringBootApplication @EnableBinding(RedMsgInterface.class) public class AppConsumer { public static void main(String[] args) { SpringApplication.run(AppConsumer.class, args); } }
在现实的业务场景中,每个微服务应用为了实现高可用和负载均衡,都会集群部署,按照上面咱们启动了两个应用的实例,消息被重复消费了两次。为解决这个问题,Spring Cloud Stream 中提供了消费组,经过配置 spring.cloud.stream.bindings.myInput.group 属性为应用指定一个组名,下面修改下配置文件,负载均衡
server: port: 8001 spring: application: name: spring-cloud-stream # rabbitmq: # host: 192.168.174.128 # port: 5672 # username: guest # password: guest cloud: stream: bindings: mymsg: ###指定 管道名称 #指定该应用实例属于 stream 消费组 group: stream
@Component public class Consumer { @Value("${server.port}") private String serverPort; @StreamListener("my_msg") public void listener(String msg) { System.out.println("消费者获取生产消息:" + msg + ",端口号:" + serverPort); } }
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> <version>2.0.1.RELEASE</version> </dependency>
server: port: 9000 spring: cloud: stream: # 设置成使用kafka kafka: binder: # Kafka的服务端列表,默认localhost brokers: 192.168.212.174:9092,192.168.212.175:9092,192.168.212.176:9092 # Kafka服务端链接的ZooKeeper节点列表,默认localhost zkNodes: 192.168.212.174:2181,192.168.212.175:2181,192.168.212.176:2181 minPartitionCount: 1 autoCreateTopics: true autoAddPartitions: true
server: port: 8000 spring: application: name: springcloud_kafka_consumer cloud: instance-count: 1 instance-index: 0 stream: kafka: binder: brokers: 192.168.212.174:9092,192.168.212.175:9092,192.168.212.176:9092 zk-nodes: 192.168.212.174:2181,192.168.212.175:2181,192.168.212.176:2181 auto-add-partitions: true auto-create-topics: true min-partition-count: 1 bindings: input: destination: my_msg group: s1 consumer: autoCommitOffset: false concurrency: 1 partitioned: false