《springcloud 五》springcloud stream

什么是消息驱动?

SpringCloud Stream消息驱动能够简化开发人员对消息中间件的使用复杂度,让系统开发人员更多尽力专一与核心业务逻辑的开发。SpringCloud Stream基于SpringBoot实现,自动配置化的功能能够帮助咱们快速上手学习,相似与咱们以前学习的orm框架,能够平滑的切换多种不一样的数据库。node

目前SpringCloud Stream 目前只支持 rabbitMQ和kafkaweb

 

消息驱动原理

绑定器

经过定义绑定器做为中间层,实现了应用程序与消息中间件细节之间的隔离。经过向应用程序暴露统一的Channel经过,是的应用程序不须要再考虑各类不一样的消息中间件的实现。当须要升级消息中间件,或者是更换其余消息中间件产品时,咱们须要作的就是更换对应的Binder绑定器而不须要修改任何应用逻辑 。spring

在该模型图上有以下几个核心概念:数据库

  • Source: 当须要发送消息时,咱们就须要经过Source,Source将会把咱们所要发送的消息(POJO对象)进行序列化(默认转换成JSON格式字符串),而后将这些数据发送到Channel中;
  • Sink: 当咱们须要监听消息时就须要经过Sink来,Sink负责从消息通道中获取消息,并将消息反序列化成消息对象(POJO对象),而后交给具体的消息监听处理进行业务处理;
  • Channel: 消息通道是Stream的抽象之一。一般咱们向消息中间件发送消息或者监听消息时须要指定主题(Topic)/消息队列名称,但这样一旦咱们须要变动主题名称的时候须要修改消息发送或者消息监听的代码,可是经过Channel抽象,咱们的业务代码只须要对Channel就能够了,具体这个Channel对应的是那个主题,就能够在配置文件中来指定,这样当主题变动的时候咱们就不用对代码作任何修改,从而实现了与具体消息中间件的解耦;
  • Binder: Stream中另一个抽象层。经过不一样的Binder能够实现与不一样消息中间件的整合,好比上面的示例咱们所使用的就是针对Kafka的Binder,经过Binder提供统一的消息收发接口,从而使得咱们能够根据实际须要部署不一样的消息中间件,或者根据实际生产中所部署的消息中间件来调整咱们的配置。

消息驱动环境搭建

生产者环境

Maven依赖信息

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.1.RELEASE</version>
    </parent>
    <dependencies>
        <!-- SpringBoot整合Web组件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            <version>2.0.1.RELEASE</version>
        </dependency>
    </dependencies>

application.yml信息

server: port: 9000 spring: application: name: spingcloud-stream-producer # rabbitmq: # host: 192.168.174.128 # port: 5672 # username: guest # password: guest

建立管道app

 

// 建立管道接口
public interface SendMessageInterface { // 建立一个输出管道,用于发送消息
    @Output("my_msg") SubscribableChannel sendMsg(); }

 

发送消息

@RestController public class SendMsgController { @Autowired private SendMessageInterface sendMessageInterface; @RequestMapping("/sendMsg") public String sendMsg() { String msg = UUID.randomUUID().toString(); System.out.println("生产者发送内容msg:" + msg); Message build = MessageBuilder.withPayload(msg.getBytes()).build(); sendMessageInterface.sendMsg().send(build); return "success"; } }

启动服务

@SpringBootApplication @EnableBinding(SendMessageInterface.class) // 开启绑定
public class AppProducer { public static void main(String[] args) { SpringApplication.run(AppProducer.class, args); } }

消费者环境

Maven

<parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.0.1.RELEASE</version>
    </parent>
    <dependencies>
        <!-- SpringBoot整合Web组件 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
            <version>2.0.1.RELEASE</version>
        </dependency>
    </dependencies>

application.yml

server: port: 9000 spring: application: name: spingcloud-stream-consumer # rabbitmq: # host: 192.168.174.128 # port: 5672 # username: guest # password: guest

管道中绑定消息

public interface RedMsgInterface { // 从管道中获取消息
    @Input("my_msg") SubscribableChannel redMsg(); }

消费者获取消息

@Component public class Consumer { @StreamListener("my_msg") public void listener(String msg) { System.out.println("消费者获取生产消息:" + msg); } }

启动消费者

@SpringBootApplication @EnableBinding(RedMsgInterface.class) public class AppConsumer { public static void main(String[] args) { SpringApplication.run(AppConsumer.class, args); } }

消费组

在现实的业务场景中,每个微服务应用为了实现高可用和负载均衡,都会集群部署,按照上面咱们启动了两个应用的实例,消息被重复消费了两次。为解决这个问题,Spring Cloud Stream 中提供了消费组,经过配置 spring.cloud.stream.bindings.myInput.group 属性为应用指定一个组名,下面修改下配置文件,负载均衡

server: port: 8001 spring: application: name: spring-cloud-stream # rabbitmq: # host: 192.168.174.128 # port: 5672 # username: guest # password: guest cloud: stream: bindings: mymsg: ###指定 管道名称 #指定该应用实例属于 stream 消费组 group: stream

修改消费者

@Component public class Consumer { @Value("${server.port}") private String serverPort; @StreamListener("my_msg") public void listener(String msg) { System.out.println("消费者获取生产消息:" + msg + ",端口号:" + serverPort); } }

 

更改环境为kafka

Maven依赖

<dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
            <version>2.0.1.RELEASE</version>
        </dependency>

生产者配置

server: port: 9000 spring: cloud: stream: # 设置成使用kafka kafka: binder: # Kafka的服务端列表,默认localhost brokers: 192.168.212.174:9092,192.168.212.175:9092,192.168.212.176:9092 # Kafka服务端链接的ZooKeeper节点列表,默认localhost zkNodes: 192.168.212.174:2181,192.168.212.175:2181,192.168.212.176:2181 minPartitionCount: 1 autoCreateTopics: true autoAddPartitions: true

消费者配置

server: port: 8000 spring: application: name: springcloud_kafka_consumer cloud: instance-count: 1 instance-index: 0 stream: kafka: binder: brokers: 192.168.212.174:9092,192.168.212.175:9092,192.168.212.176:9092 zk-nodes: 192.168.212.174:2181,192.168.212.175:2181,192.168.212.176:2181 auto-add-partitions: true auto-create-topics: true min-partition-count: 1 bindings: input: destination: my_msg group: s1 consumer: autoCommitOffset: false concurrency: 1 partitioned: false
相关文章
相关标签/搜索