SpringCloud实战8-Bus消息总线

好了如今咱们接着上一篇的随笔,继续来说。上一篇咱们讲到,咱们若是要去更新全部微服务的配置,在不重启的状况下去更新配置,只能依靠spring cloud config了,可是,是咱们要一个服务一个服务的发送post请求,git

咱们能受的了吗?这比以前的没配置中心好多了,那么咱们如何继续避免挨个挨个的向服务发送Post请求来告知服务,你的配置信息改变了,须要及时修改内存中的配置信息。web

这时候咱们就不要忘记消息队列的发布订阅模型。让全部为服务来订阅这个事件,当这个事件发生改变了,就能够通知全部微服务去更新它们的内存中的配置信息。这时Bus消息总线就能解决,你只须要在springcloud Config Server端发出refresh,就能够触发全部微服务更新了。spring

以下架构图所示:apache

 

Spring Cloud Bus除了支持RabbitMQ的自动化配置以外,还支持如今被普遍应用的Kafka。在本文中,咱们将搭建一个Kafka的本地环境,并经过它来尝试使用Spring Cloud Bus对Kafka的支持,实现消息总线的功能。bootstrap

Kafka使用Scala实现,被用做LinkedIn的活动流和运营数据处理的管道,如今也被诸多互联网企业普遍地用做为数据流管道和消息系统。windows

Kafak架构图以下:服务器

 

Kafka是基于消息发布/订阅模式实现的消息系统,其主要设计目标以下:架构

  1.消息持久化:以时间复杂度为O(1)的方式提供消息持久化能力,即便对TB级以上数据也能保证常数时间复杂度的访问性能。并发

  2.高吞吐:在廉价的商用机器上也能支持单机每秒100K条以上的吞吐量app

  3.分布式:支持消息分区以及分布式消费,并保证分区内的消息顺序

  4.跨平台:支持不一样技术平台的客户端(如:Java、PHP、Python等)

  5.实时性:支持实时数据处理和离线数据处理

  6.伸缩性:支持水平扩展

Kafka中涉及的一些基本概念:

  1.Broker:Kafka集群包含一个或多个服务器,这些服务器被称为Broker。

  2.Topic:逻辑上同Rabbit的Queue队列类似,每条发布到Kafka集群的消息都必须有一个Topic。(物理上不一样Topic的消息分开存储,逻辑上一个Topic的消息虽然保存于一个或多个Broker上,但用户只需指定消息的Topic便可生产或消费数据而没必要关心数据存于何处)

  3.Partition:Partition是物理概念上的分区,为了提供系统吞吐率,在物理上每一个Topic会分红一个或多个Partition,每一个Partition对应一个文件夹(存储对应分区的消息内容和索引文件)。

  4.Producer:消息生产者,负责生产消息并发送到Kafka Broker。

  5.Consumer:消息消费者,向Kafka Broker读取消息并处理的客户端。

  6.Consumer Group:每一个Consumer属于一个特定的组(可为每一个Consumer指定属于一个组,若不指定则属于默认组),组能够用来实现一条消息被组内多个成员消费等功能。

 

能够从kafka的架构图看到Kafka是须要Zookeeper支持的,你须要在你的Kafka配置里面指定Zookeeper在哪里,它是经过Zookeeper作一些可靠性的保证,作broker的主从,咱们还要知道Kafka的消息是以topic形式做为组织的,Producers发送topic形式的消息,
Consumer是按照组来分的,因此,一组Consumers都会都要一样的topic形式的消息。在服务端,它还作了一些分片,那么一个Topic可能分布在不一样的分片上面,方便咱们拓展部署多个机器,Kafka是天生分布式的。
这里为了演示,咱们只须要用它的默认配置,在windows上作个小Demo便可。

咱们这里主要针对Spring Cloud Bus对Kafka的支持,实现消息总线的功能,具体的Kafka,RabbitMQ消息队列但愿本身去找资料来学习一下。
 
有了一些概念的支持后,咱们进行一些Demo。以下:
首先新建一个springCloud-config-client1模块,方便咱们进行测试
所引入的依赖以下:
    <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-config</artifactId>
            <version>1.4.0.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-eureka</artifactId>
            <version>1.3.5.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-kafka</artifactId>
            <version>1.3.2.RELEASE</version>
        </dependency>

 

接着要注意一下,client1的配置文件要改成bootstrap.yml,由于这种配置格式,是优先加载的,上一篇随笔有讲过,client1的配置以下:

server:
  port: 7006
spring:
  application:
    name: cloud-config
  cloud:
    config:
#启动什么环境下的配置,dev 表示开发环境,这跟你仓库的文件的后缀有关,好比,仓库配置文件命名格式是cloud-config-dev.properties,因此profile 就要写dev
      profile: dev
      discovery:
        enabled: true
#这个名字是Config Server端的服务名字,不能瞎写。
        service-id: config-server
#注册中心
eureka:
  client:
    service-url:
      defaultZone: http://localhost:8888/eureka/,http://localhost:8889/eureka/
#是否须要权限拉去,默认是true,若是不false就不容许你去拉取配置中心Server更新的内容
management:
  security:
    enabled: false

接着启动类以下:

@SpringBootApplication
@EnableDiscoveryClient
public class Client1Application {

    public static void main(String[] args) {
        SpringApplication.run(Client1Application.class, args);
    }
}

 

接着将client中的TestController赋值一份到client1中,代码以下:

@RestController
//这里面的属性有可能会更新的,git中的配置中心变化的话就要刷新,没有这个注解内,配置就不能及时更新
@RefreshScope
public class TestController {

    @Value("${name}")
    private String name;
    @Value("${age}")
    private Integer age;

    @RequestMapping("/test")
    public String test(){
        return this.name+this.age;
    }
}

 

 

接着还要在先前的随笔中的模块中的Config Server加入以下配置:

#是否须要权限拉去,默认是true,若是不false就不容许你去拉取配置中心Server更新的内容
management:
  security:
    enabled: false

 

接着还要作一点就是,在config-client,config-client1,和config-Server都要引入kafka的依赖,以下:

      <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-kafka</artifactId>
            <version>1.3.2.RELEASE</version>
        </dependency>

咱们工程准备好了,暂时先放在这里,下面进行Kafka的安装下载,首先咱们去Kafka官网kafka.apache.org/downloads  下来官网推荐的版本,

 

 首先咱们进到下载好的Kafka目录中kafka_2.11-1.1.0\bin\windows 下编辑kafka-run-class.bat以下:

找到这条配置 以下:

set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp %CLASSPATH% %KAFKA_OPTS% %*

 

能够看到%CLASSPATH%没有双引号,

所以用双引号括起来,否则启动不起来的,报你JDK没安装好,修改后以下:

set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp "%CLASSPATH%" %KAFKA_OPTS% %*

接着,打开config文件夹中的server.properties配置以下:

能够看到是链接到本地的zookeeper就好了。

接着咱们进行先启动zookeeper,再启动Kafka,以下:

当看到上面的信息证实启动Zookeeper启动成功。、

接下来再开一个CMD启动Kafka,以下:

看到这些信息说明Kafka启动成功了

 

好了,接下来把前面的工程,两个注册中心,一个springcloud-config-server,两个springcloud-config-client,springcloud-config-client1启动起来,

能够看到springcloudBus是在0分片上,若是两个config-client启动都出现上面信息,证实启动成功了。

好了如今咱们进行访问一下config-server端,以下:

 

再访问两个client,以下:

 

 

好了,好戏开始了,如今咱们去git仓库上修改配置中心的文件,将年龄改成24,以下:

 

接下来,咱们咱们用refresh刷新配置服务端配置,通知两个client去更新内存中的配置信息。用postman发送localhost:7000/bus/refresh,以下:

能够看到没有返回什么信息,可是不要担忧,这是成功的通知全部client去更新了内存中的信息了。

接着咱们分别从新请求config-server,两个client,刷新页面,结果以下:

两个client以下:

能够看到全部client自动更新内存中的配置信息了。

 

到目前为止,上面都是刷新说有的配置的信息的,若是咱们想刷新某个特定服务的配置信息也是能够的。咱们能够指定刷新范围,以下:

指定刷新范围

  上面的例子中,咱们经过向服务实例请求Spring Cloud Bus的/bus/refresh接口,从而触发总线上其余服务实例的/refresh。可是有些特殊场景下(好比:灰度发布),咱们但愿能够刷新微服务中某个具体实例的配置。

  Spring Cloud Bus对这种场景也有很好的支持:/bus/refresh接口还提供了destination参数,用来定位具体要刷新的应用程序。好比,咱们能够请求/bus/refresh?destination=服务名字:9000,此时总线上的各应用实例会根据destination属性的值来判断是否为本身的实例名,

若符合才进行配置刷新,若不符合就忽略该消息。

  destination参数除了能够定位具体的实例以外,还能够用来定位具体的服务。定位服务的原理是经过使用Spring的PathMatecher(路径匹配)来实现,好比:/bus/refresh?destination=customers:**,该请求会触发customers服务的全部实例进行刷新。

相关文章
相关标签/搜索