Go微服务 - 第八部分 - 使用Viper和Spring Cloud Config进行集中配置

第八部分: Go微服务 - 使用Viper和Spring Cloud Config进行集中配置

在第八部分,咱们探索Go微服务中使用Spring Cloud Config进行集中配置。java

简介

考虑到微服务毕竟是用来分解应用为独立软件片断的,在微服务中集中处理一些东西感受有些不太搭配。然而咱们一般在后面的是进程之间的独立。微服务的其余操做应该集中处理。例如,日志应该在你的日志解决, 好比elk stack中终结, 监控应该归入专用的监控中。 在这部分,咱们将使用Spring Cloud Config和git处理外部化和集中配置。linux

集中处理组成咱们应用程序的各类微服务的配置其实是很天然的事情。 特别是在未知数量底层硬件节点上的容器环境运行的时候,管理配置文件构建到每一个微服务的映像中或放到不一样的安装卷中,很快就会变成真正的难题。有不少行之有效的项目能够帮咱们处理这些问题,例如etcd, consul和ZooKeeper。然而,应该注意的是,这些项目提供的不只仅是配置服务。既然本文聚焦的是集成Go微服务和Spring Cloud/Netflix OSS生态的支持服务, 咱们将基于Spring Cloud配置进行集中配置, Spring Cloud Config是一个提供精确配置的专用软件。git

Spring Cloud Config

Spring Cloud生态提供了集中配置的解决方案,也没有什么创意,就叫Spring Cloud Config。Spring Cloud Config服务器能够被视为服务和真正配置之间的代理, 提供了一些很是整洁的特性:github

  • 支持多种不一样的后端,例如git(默认), 用于etcd、consul和ZooKeeper的文件系统和插件。
  • 加密属性的透明解密。
  • 可插拔安全性。
  • 使用git钩子/REST API以及Spring Cloud Bus(例如RabbitMQ)的推送机制来将配置文件中的改变传播到服务,使得配置的实时更新成为可能。

个人同事Magnus最近的一篇文章对Spring Cloud Config进行特别深刻的探讨, 见参考链接。在本文中,咱们将集成咱们的accountservice服务和Spring Cloud Config服务,配置后端使用公开的位于github上的git仓库, 从仓库中咱们能够获取配置,解密/加密属性以及实现实时重载配置。golang

下面是咱们整个解决方案目标的简单概述:web

clipboard.png

概述

既然咱们以Swarm模式运行Docker, 咱们将继续以各类方式使用Docker的机制。在Swarm内部,咱们应该运行至少一个(能够更多)Spring Cloud Config服务器。当咱们的微服务中的一个启动的时候,它们要知道:算法

  • 配置服务器的逻辑服务名和端口号。也就是说,咱们把咱们的配置服务器也部署到Docker Swarm上做为服务,这里咱们称之为configserver。意味着这是微服务要请求配置的时候惟一须要知道的东西。
  • 它们的名字是什么, 例如"accountservice"。
  • 它运行在什么样的执行配置文件上,例如"dev", "test", "prod"。 若是你对spring.profiles.active概念比较熟悉的话,这用于Go语言同样很天然。
  • 若是咱们使用git做为后端,并想从特定的分支获取配置信息,咱们就须要提早知道(可选的)。

鉴于上面四个标准, 请求配置的简单GET可能看起来像下面的样子:spring

resp, err := http.Get("http://configserver:8888/accountservice/dev/P8")

也就是下面的协议:docker

protocol://url:port/applicationName/profile/branch

在Swarm中搭建一个Spring Cloud配置服务器

本文代码能够从github直接克隆下来: https://github.com/callistaen...shell

你也能够用其余方式来设置和部署配置服务器。而我在goblog目录下面准备了一个support目录,用于存放https://github.com/callistaen...,里边包含了咱们后面须要的第三方服务。

通常来讲,每一个必要的支持组件要么是简单的便于构建和部署组件的现成的Dockerfile, 要么是(java)源代码和配置(Spring Cloud应用一般是基于Spring Boot的), 这样咱们须要本身使用Gradle构建。(不须要担忧,咱们只须要安装JDK就能够了)。

(这些Spring Cloud应用程序大部分个人同事都已经提早准备好了。具体能够参考Java微服务)

RabbitMQ

什么状况? 咱们不是要安装Spring Cloud Config服务器吗? 好吧,这个依赖的软件具备一个消息中间人,可使用支持RabbitMQ的Spring Cloud Bus来传播配置改变。

有RabbitMQ是一个很好的事情,无论怎么说,咱们文章后面还会用到它。因此将从RabbitMQ开始,并在咱们的Swarm中做为服务来运行。

我已经在/goblog/support/rabbitmq目录下面准备了一个Dockerfile,可使用我在Docker Swarm服务中提早准备好的映像。

# use rabbitmq official
FROM rabbitmq

# enable management plugin
RUN rabbitmq-plugins enable --offline rabbitmq_management

# enable mqtt plugin
RUN rabbitmq-plugins enable --offline rabbitmq_mqtt

# expose management port
EXPOSE 15672
EXPOSE 5672

而后咱们能够建立一个脚本文件, 在须要更新的时候帮咱们自动作这些事情。

#!/bin/bash

# RabbitMQ
docker service rm rabbitmq
docker build -t someprefix/rabbitmq support/rabbitmq/
docker service create --name=rabbitmq --replicas=1 --network=my_network -p 1883:1883 -p 5672:5672 -p 15672:15672 someprefix/rabbitmq

(注意,你可能须要给这个脚本语言添加可执行权限。)

运行它,等待Docker下载必要的映像,并将它部署到Swarm中。 当它完成的时候,你就能够打开RabbitMQ管理UI,而且能使用guest/guest来登陆进去。

Spring Cloud Config服务器

在/support/config-server中你会发现一个提早配置好的Spring Boot应用程序,它用于运行配置服务器。咱们会使用一个git仓库来保存和访问咱们的yaml文件存储的配置。

---
# For deployment in Docker containers
spring:
  profiles: docker
  cloud:
    config:
      server:
        git:
          uri: https://github.com/eriklupander/go-microservice-config.git

# Home-baked keystore for encryption. Of course, a real environment wouldn't expose passwords in a blog...
encrypt:
  key-store:
    location: file:/server.jks
    password: letmein
    alias: goblogkey
    secret: changeme

# Since we're running in Docker Swarm mode, disable Eureka Service Discovery
eureka:
  client:
    enabled: false

# Spring Cloud Config requires rabbitmq, use the service name.
spring.rabbitmq.host: rabbitmq
spring.rabbitmq.port: 5672

上面是配置服务器的配置文件。咱们能够看到一些东西:

  • 咱们告诉config-server到咱们指定的URL来获取配置。
  • 一个密钥库,用于加密(自签名)和解密的密钥存储库。
  • 既然咱们是运行在Docker Swarm模式下的,所以eureka的服务发现功能是禁用的。
  • 配置服务器指望找到一个RabbitMQ, 它的host名为rabbitmq, 端口为5672, host恰好是刚才咱们给咱们的RabbitMQ服务起的Docker Swarm服务名。

下面是配置服务器的Dockerfile内容, 至关简单:

FROM davidcaste/alpine-java-unlimited-jce

EXPOSE 8888

ADD ./build/libs/*.jar app.jar
ADD ./server.jks /

ENTRYPOINT ["java","-Dspring.profiles.active=docker","-Djava.security.egd=file:/dev/./urandom","-jar","/app.jar"]

不要介意java.security.egd的东西,这是这个文章系列中咱们不须要关心的问题的解决办法。

这里有几点须要注意:

  • 咱们使用的镜像是基于Alpine Linux的,没有限制Java的加密扩展安装的。 这是一个必要要求,若是咱们想要Spring Cloud Config的加密/解密功能。
  • 容器镜像的根目录中咱们加入了在提早准备好的keystore。

编译keystore

后面咱们要使用加密属性,咱们须要为配置服务器带一个自签名证书。(这里咱们须要使用keytool工具。)

在/goblog/support/config-server目录下面执行下面的命令:

keytool -genkeypair -alias goblogkey -keyalg RSA -dname "CN=Go Blog,OU=Unit,O=Organization,L=City,S=State,C=SE" -keypass changeme -keystore server.jks -storepass letmein -validity 730

keytool是一个密钥和证书管理工具。它具备不少选项:

  • -certreq: 生成证书请求。
  • -changealias: 更改条目的别名。
  • -delete: 删除条目。
  • -exportcert: 导出证书。
  • -genkeypair: 生成密钥对。
  • -genseckey: 生成密钥。
  • -gencert: 根据证书请求生成证书。
  • -importcert: 导入证书或证书链。
  • -importpass: 导入口令。
  • -importkeystore: 从其余密钥库导入一个或全部条目。
  • -keypasswd: 更改条目的密钥口令。
  • -list: 列出密钥库中的条目。
  • -printcert: 打印证书内容。
  • -printcertreq: 打印证书请求的内容。
  • -printcrl: 打印 CRL 文件的内容。
  • -storepasswd: 更改密钥库的存储口令。

执行完上面命令后在当前目录下面生成一个server.jks keystore签名证书。你能够随意修改任何属性/密码, 主要记住相应的更改application.yml就能够了。

...
encrypt:
  key-store:
    location: file:/server.jks
    password: letmein
    alias: goblogkey
    secret: changeme
...

构建部署

是时候构建部署服务器了。 咱们先建立一个shell脚原本节约咱们时间,由于咱们可能会须要重复作不少次。 记住 - 你须要Java运行时环境来构建它。 在/goblog目录,咱们建立一个springcloud.sh的脚本文件。 咱们把全部真正须要构建的东西都放这里(构建可能须要很长时间):

#!/bin/bash

cd support/config-server
./gradlew build
cd ../..
docker build -t someprefix/configserver support/config-server/
docker service rm configserver
docker service create --replicas 1 --name configserver -p 8888:8888 --network my_network --update-delay 10s --with-registry-auth  --update-parallelism 1 someprefix/configserver

而后运行脚本,须要修改脚本的可执行权限。
等待几分钟时间,而后检查它是否在docker服务中启动运行了:

> docker service ls

ID                  NAME                MODE                REPLICAS            IMAGE
39d26cc3zeor        rabbitmq            replicated          1/1                 someprefix/rabbitmq
eu00ii1zoe76        viz                 replicated          1/1                 manomarks/visualizer:latest
q36gw6ee6wry        accountservice      replicated          1/1                 someprefix/accountservice
t105u5bw2cld        quotes-service      replicated          1/1                 eriklupander/quotes-service:latest
urrfsu262e9i        dvizz               replicated          1/1                 eriklupander/dvizz:latest
w0jo03yx79mu        configserver        replicated          1/1                 someprefix/configserver

而后能够经过curl来加载accountservice的JSON配置。

> curl http://$ManagerIP:8888/accountservice/dev/master
{"name":"accountservice","profiles":["dev"],"label":"master","version":"b8cfe2779e9604804e625135b96b4724ea378736",
    "propertySources":[
    {"name":"https://github.com/eriklupander/go-microservice-config.git/accountservice-dev.yml",
    "source":
        {"server_port":6767,"server_name":"Accountservice DEV"}
    }]
}

(这里输出为了简洁,咱们格式化了的)。实际配置保存在source属性中,在那里包含有全部.yml文件的属性值,它们以key-value对的形式出现。加载并解析source属性到Go语言可用的配置中, 是本文的中间件来完成的。

yaml配置文件

在咱们深刻到Go代码以前,咱们先看看https://github.com/eriklupand...:

accountservice-dev.yml
accountservice-test.yml

这两个文件目前里边的内容都很是少。

server_port: 6767
server_name: Accountservice TEST
the_password: (we'll get back to this one)

这里咱们只配置了咱们但愿绑定服务的HTTP端口号。真实的服务可能在里边设置不少东西。

使用解密/加密

Spring Cloud Config其中一个灵活的地方就是在配置文件中支持内置支持透明的解密被加密值。例如,能够看看accountservice-test.yml文件,那里咱们有the_password属性:

server_port: 6767
server_name: Accountservice TEST
the_password: '{cipher}AQB1BMFCu5UsCcTWUwEQt293nPq0ElEFHHp5B2SZY8m4kUzzqxOFsMXHaH7SThNNjOUDGxRVkpPZEkdgo6aJFSPRzVF04SXOVZ6Rjg6hml1SAkLy/k1R/E0wp0RrgySbgh9nNEbhzqJz8OgaDvRdHO5VxzZGx8uj5KN+x6nrQobbIv6xTyVj9CSqJ/Btf/u1T8/OJ54vHwi5h1gSvdox67teta0vdpin2aSKKZ6w5LyQocRJbONUuHyP5roCONw0pklP+2zhrMCy0mXhCJSnjoHvqazmPRUkyGcjcY3LHjd39S2eoyDmyz944TKheI6rWtCfozLcIr/wAZwOTD5sIuA9q8a9nG2GppclGK7X649aYQynL+RUy1q7T7FbW/TzSBg='

使用字符串{cipher}做为解密前缀,咱们的Spring Cloud配置服务器将在传递结果给服务器以前,知道如何自动为咱们解密值。在全部配置都正确的运行实例中,curl请求REST API来获取这个配置将返回:

...
      "source": {
        "server_port": 6767,
        "server_name": "Accountservice TEST",
        "the_password": "password"
....

至关灵活吧, 对吧?the_password属性能够在公网服务器和Spring Cloud服务器(它可能在不安全环境或内部服务器外部可见的任何环境都不可用。)中用保存明文加密的字符串(若是你相信加密算法和签名密钥的完整性)透明解密这个属性为真正的password。

固然,你须要使用相同的key做为Spring Cloud Config的解密key来解密,有些事情能够经过配置服务器的HTTP API来完成。

curl http://$ManagerIP:8888/encrypt -d 'password'
AQClKEMzqsGiVpKx+Vx6vz+7ww00n... (rest omitted for brevity)

Viper

咱们的基于Go的配置框架选择的是Viper。 Viper具备很好的API能够用, 而且很方便扩展, 而且不会妨碍咱们正常的应用代码。虽然Viper不肯生的支持从Spring Cloud配置服务器加载配置, 可是咱们能够写一小片代码能够帮咱们作到这点。 Viper也能够处理不少种文件类型做为配置源 - 例如json, yaml, 普通属性文件。 Viper能够为咱们从OS读取环境变量, 至关整洁。 一旦初始化并产生后,咱们的配置老是可使用各类的viper.Get函数获取来使用,确实很方便。

还记得在本文开头的图片吗? 好吧,若是不记得了, 咱们再重复一遍:

clipboard.png

咱们将让微服务启动的时候发起一个HTTP请求, 获取JSON响应的source部分,并将它们放到Viper中,这样咱们就能够在那里获取咱们的web服务器的端口号了。 让咱们开始吧。

加载配置

正如使用curl的已展现示例,咱们能够对配置服务器进行简单HTTP请求,那里咱们只须要知道名字和咱们的profile便可。 咱们将添加一些解析flag的功能到咱们的accountservice main.go, 所以在启动的时候,咱们能够指定一个环境profile,也能够指定到配置服务器的可选的URI。

var appName = "accountservice"

// Init function, runs before main()
func init() {
    // Read command line flags
    profile := flag.String("profile", "test", "Environment profile, something similar to spring profiles")
    configServerUrl := flag.String("configServerUrl", "http://configserver:8888", "Address to config server")
    configBranch := flag.String("configBranch", "master", "git branch to fetch configuration from")
    flag.Parse()
    
    // Pass the flag values into viper.
    viper.Set("profile", *profile)
    viper.Set("configServerUrl", *configServerUrl)
    viper.Set("configBranch", *configBranch)
}

func main() {
    fmt.Printf("Starting %v\n", appName)

    // NEW - load the config
    config.LoadConfigurationFromBranch(
        viper.GetString("configServerUrl"),
        appName,
        viper.GetString("profile"),
        viper.GetString("configBranch"))
    initializeBoltClient()
    service.StartWebServer(viper.GetString("server_port"))    // NEW, use port from loaded config 
}

init函数比较简单,就是从命令行参数解析flag参数值,而后设置到viper中。 在main函数中,调用config.LoadConfigurationFromBranch, 从远程git仓库加载配置。这里config.LoadConfigurationFromBranch是在goblog/common/config/loader.go中定义的:

package config

import (
    "encoding/json"
    "fmt"
    "io/ioutil"
    "net/http"

    "github.com/Sirupsen/logrus"
    "github.com/spf13/viper"
)

// LoadConfigurationFromBranch loads config from for example http://configserver:8888/accountservice/test/P8
func LoadConfigurationFromBranch(configServerURL string, appName string, profile string, branch string) {
    url := fmt.Sprintf("%s/%s/%s/%s", configServerURL, appName, profile, branch)
    logrus.Printf("Loading config from %s\n", url)
    body, err := fetchConfiguration(url)
    if err != nil {
        logrus.Errorf("Couldn't load configuration, cannot start. Terminating. Error: %v", err.Error())
        panic("Couldn't load configuration, cannot start. Terminating. Error: " + err.Error())
    }
    parseConfiguration(body)
}

func fetchConfiguration(url string) ([]byte, error) {
    defer func() {
        if r := recover(); r != nil {
            fmt.Println("Recovered in f", r)
        }
    }()
    logrus.Printf("Getting config from %v\n", url)
    resp, err := http.Get(url)
    if err != nil || resp.StatusCode != 200 {
        logrus.Errorf("Couldn't load configuration, cannot start. Terminating. Error: %v", err.Error())
        panic("Couldn't load configuration, cannot start. Terminating. Error: " + err.Error())
    }
    body, err := ioutil.ReadAll(resp.Body)
    if err != nil {
        panic("Error reading configuration: " + err.Error())
    }
    return body, err
}

func parseConfiguration(body []byte) {
    var cloudConfig springCloudConfig
    err := json.Unmarshal(body, &cloudConfig)
    if err != nil {
        panic("Cannot parse configuration, message: " + err.Error())
    }

    for key, value := range cloudConfig.PropertySources[0].Source {
        viper.Set(key, value)
        logrus.Printf("Loading config property %v => %v\n", key, value)
    }
    if viper.IsSet("server_name") {
        logrus.Printf("Successfully loaded configuration for service %s\n", viper.GetString("server_name"))
    }
}

type springCloudConfig struct {
    Name            string           `json:"name"`
    Profiles        []string         `json:"profiles"`
    Label           string           `json:"label"`
    Version         string           `json:"version"`
    PropertySources []propertySource `json:"propertySources"`
}

type propertySource struct {
    Name   string                 `json:"name"`
    Source map[string]interface{} `json:"source"`
}

本代码引入了三个包logrus, viper和amqp。由于我没有使用deps之类的包管理工具,所以咱们在安装logrus和viper包的时候,这两个包也有依赖的第三方包,咱们手工进行一些go get:

mkdir -p $GOPATH/src/golang.org/x
cd !$

git clone https://github.com/golang/text.git
git clone https://github.com/golang/sys.git


logrus go get问题
git clone https://github.com/golang/crypto.git

loadConfigurationFromBranch函数根据提供的参数获取配置并解析配置到viper中。

基本上来讲就是咱们发起一个带有appName, profile, git branch参数的HTTP GET请求到配置服务器, 而后解码响应JSON到在同一文件中声明的springCloudConfig结构体中。最后咱们简单迭代cloudConfig.PropertySources[0]的全部key-value对, 并将它们分别放入viper, 这样咱们能够随处均可以使用viper.GetString(key)或其余的Viper提供的其余Get方法来获取它们。

注意,若是咱们链接配置服务器或解析响应发生错误的话,就会panic()整个微服务, 这样就会kill掉它。Docker Swarm将检测这个并尝试在数秒以内部署一个新的实例。 拥有这样行为的典型缘由在于集群冷启动的时候,基于Go的微服务要比基于Sping Boot的配置服务器启动要快得多。让Swarm尝试几回,事情会本身解决掉的。

咱们吧实际工做分割到一个公共函数和一些包级别的函数单元,主要是便于单元测试。 单元测试检查,以便咱们能将JSON转换为实际的viper属性,看起来想GoConvey样式的测试:

func TestParseConfiguration(t *testing.T) {
    Convey("Given a JSON configuration response body", t, func() {
        var body = `{"name":"accountservice-dev","profiles":["dev"],"label":null,"version":null,"propertySources":[{"name":"file:/config-repo/accountservice-dev.yml","source":{"server_port":6767"}}]}`

        Convey("When parsed", func() {
            parseConfiguration([]byte(body))

            Convey("Then Viper should have been populated with values from Source", func() {
                So(viper.GetString("server_port"), ShouldEqual, "6767")
            })
        })
    })
}

而后在goblog/accountservice目录运行测试: go test ./...

更新Dockerfile

鉴于咱们是从外部源加载配置,咱们的服务须要一个查找的线索。 这能够在容器和服务启动的时候,经过使用flag做为命令行参数来执行。

FROM iron/base
EXPOSE 6767

ADD accountservice-linux-amd64 /
ADD healthchecker-linux-amd64 /

HEALTHCHECK --interval=3s --timeout=3s CMD ["./healthchecker-linux-amd64", "-port=6767"] || exit 1
ENTRYPOINT ["./accountservice-linux-amd64", "-configServerUrl=http://configserver:8888", "-profile=test", "-configBranch=P8"]

ENTRYPOINT如今提供了一些值,使得它能够到达配置,这样能够加载配置。

放入Swarm

你可能已经注意到咱们再也不使用6767端口号做为端口号的硬编码了, 也就是:

service.StartWebServer(viper.GetString("server_port"))

使用copyall.sh脚本从新构建并部署更新后的accountservice到Docker Swarm中。

全部事情都完成的时候,服务依然如本博客系列那样运行,例外的是它其实是从外部和集中化配置服务器拿的端口号,而不是硬编码到编译二进制文件的端口号。

咱们能够看看咱们的accountservice的日志:

docker logs -f [containerid]
Starting accountservice
Loading config from http://configserver:8888/accountservice/test/P8
Loading config property the_password => password
Loading config property server_port => 6767
Loading config property server_name => Accountservice TEST
Successfully loaded configuration for service Accountservice TEST

这里咱们又get新技能了,使用docker logs能够查看具体容器的日志:

Usage:    docker logs [OPTIONS] CONTAINER

Fetch the logs of a container

Options:
      --details        Show extra details provided to logs
  -f, --follow         Follow log output
      --since string   Show logs since timestamp (e.g. 2013-01-02T13:23:37) or relative (e.g. 42m for 42 minutes)
      --tail string    Number of lines to show from the end of the logs (default "all")
  -t, --timestamps     Show timestamps
      --until string   Show logs before a timestamp (e.g. 2013-01-02T13:23:37) or relative (e.g. 42m for 42 minutes)

docker logs支持查询某个时间点先后的日志。

实际上打印配置值是错误的作法,这里咱们只是出于学习目的做出的输出。这里咱们使用logrus来打印日志。

实时配置更新

1. 哦,咱们用于某种目的的外部服务器的URL是否改变了呢?
2. 该死,怎么没有人告诉我!

假设咱们不少人都遇到下面状况, 咱们须要重建整个应用或至少重启来更新一些无效或改变的配置值。Spring Cloud具备刷新域的概念,其中bean能够实时更新,使用配置修改经过git commit hook传播。

下图提供了一个如何推送到git仓库,传播到咱们Go微服务的概览:

clipboard.png

在本文中,咱们使用的是github仓库,它彻底不知道如何执行post-commit hook操做到个人笔记本的Spring Cloud Server, 所以咱们将模拟一个提交挂钩使用Spring Cloud服务器的内置/监控端点来推送。

curl -H "X-Github-Event: push" -H "Content-Type: application/json" -X POST -d '{"commits": [{"modified": ["accountservice.yml"]}],"name":"some name..."}' -ki http://$ManagerIP:8888/monitor

Spring Cloud服务器将知道使用这个POST作什么,并在RabbitMQ(由Spring Cloud Bus抽象出来的)的交换上发送一个RefreshRemoteApplicationEvent。若是在成功引导了Spring Cloud Config以后,看看RabbitMQ的管理界面,应该建立了exchange。

clipboard.png

exchange和传统的消息控制例如publisher, consumer, queue的区别是什么?

Publisher -> Exchange -> (Routing) -> Queue -> Consumer

也就是消息被发布到exchange, 而后基于路由规则和可能注册了消费者的捆绑将消息副本分布到queue。

所以为了消费RefreshRemoteApplicationEvent消息(我更喜欢调用它们的refresh tokens), 全部咱们须要作的是确保咱们的Go服务在springCloudBus exchange上监听这样的消息, 若是咱们的目标应用执行了配置重载。 下面咱们来实现它。

Go语言中使用AMQP协议来消费消息

RabbitMQ中间人能够经过使用AMQP协议来访问。咱们将使用一个叫作streadway/amqp的Go版本的AMQP客户端。 大部分AMQP/RabbitMQ管道代码都应该使用一些可复用工具,可能咱们稍后会重构它。 基于这个例子的管道代码是来自streadway/amqp仓库的:

// This example declares a durable Exchange, an ephemeral (auto-delete) Queue,
// binds the Queue to the Exchange with a binding key, and consumes every
// message published to that Exchange with that routing key.
//
package main

import (
    "flag"
    "fmt"
    "github.com/streadway/amqp"
    "log"
    "time"
)

var (
    uri          = flag.String("uri", "amqp://guest:guest@localhost:5672/", "AMQP URI")
    exchange     = flag.String("exchange", "test-exchange", "Durable, non-auto-deleted AMQP exchange name")
    exchangeType = flag.String("exchange-type", "direct", "Exchange type - direct|fanout|topic|x-custom")
    queue        = flag.String("queue", "test-queue", "Ephemeral AMQP queue name")
    bindingKey   = flag.String("key", "test-key", "AMQP binding key")
    consumerTag  = flag.String("consumer-tag", "simple-consumer", "AMQP consumer tag (should not be blank)")
    lifetime     = flag.Duration("lifetime", 5*time.Second, "lifetime of process before shutdown (0s=infinite)")
)

func init() {
    flag.Parse()
}

func main() {
    c, err := NewConsumer(*uri, *exchange, *exchangeType, *queue, *bindingKey, *consumerTag)
    if err != nil {
        log.Fatalf("%s", err)
    }

    if *lifetime > 0 {
        log.Printf("running for %s", *lifetime)
        time.Sleep(*lifetime)
    } else {
        log.Printf("running forever")
        select {}
    }

    log.Printf("shutting down")

    if err := c.Shutdown(); err != nil {
        log.Fatalf("error during shutdown: %s", err)
    }
}

type Consumer struct {
    conn    *amqp.Connection
    channel *amqp.Channel
    tag     string
    done    chan error
}

func NewConsumer(amqpURI, exchange, exchangeType, queueName, key, ctag string) (*Consumer, error) {
    c := &Consumer{
        conn:    nil,
        channel: nil,
        tag:     ctag,
        done:    make(chan error),
    }

    var err error

    log.Printf("dialing %q", amqpURI)
    c.conn, err = amqp.Dial(amqpURI)
    if err != nil {
        return nil, fmt.Errorf("Dial: %s", err)
    }

    go func() {
        fmt.Printf("closing: %s", <-c.conn.NotifyClose(make(chan *amqp.Error)))
    }()

    log.Printf("got Connection, getting Channel")
    c.channel, err = c.conn.Channel()
    if err != nil {
        return nil, fmt.Errorf("Channel: %s", err)
    }

    log.Printf("got Channel, declaring Exchange (%q)", exchange)
    if err = c.channel.ExchangeDeclare(
        exchange,     // name of the exchange
        exchangeType, // type
        true,         // durable
        false,        // delete when complete
        false,        // internal
        false,        // noWait
        nil,          // arguments
    ); err != nil {
        return nil, fmt.Errorf("Exchange Declare: %s", err)
    }

    log.Printf("declared Exchange, declaring Queue %q", queueName)
    queue, err := c.channel.QueueDeclare(
        queueName, // name of the queue
        true,      // durable
        false,     // delete when unused
        false,     // exclusive
        false,     // noWait
        nil,       // arguments
    )
    if err != nil {
        return nil, fmt.Errorf("Queue Declare: %s", err)
    }

    log.Printf("declared Queue (%q %d messages, %d consumers), binding to Exchange (key %q)",
        queue.Name, queue.Messages, queue.Consumers, key)

    if err = c.channel.QueueBind(
        queue.Name, // name of the queue
        key,        // bindingKey
        exchange,   // sourceExchange
        false,      // noWait
        nil,        // arguments
    ); err != nil {
        return nil, fmt.Errorf("Queue Bind: %s", err)
    }

    log.Printf("Queue bound to Exchange, starting Consume (consumer tag %q)", c.tag)
    deliveries, err := c.channel.Consume(
        queue.Name, // name
        c.tag,      // consumerTag,
        false,      // noAck
        false,      // exclusive
        false,      // noLocal
        false,      // noWait
        nil,        // arguments
    )
    if err != nil {
        return nil, fmt.Errorf("Queue Consume: %s", err)
    }

    go handle(deliveries, c.done)

    return c, nil
}

func (c *Consumer) Shutdown() error {
    // will close() the deliveries channel
    if err := c.channel.Cancel(c.tag, true); err != nil {
        return fmt.Errorf("Consumer cancel failed: %s", err)
    }

    if err := c.conn.Close(); err != nil {
        return fmt.Errorf("AMQP connection close error: %s", err)
    }

    defer log.Printf("AMQP shutdown OK")

    // wait for handle() to exit
    return <-c.done
}

func handle(deliveries <-chan amqp.Delivery, done chan error) {
    for d := range deliveries {
        log.Printf(
            "got %dB delivery: [%v] %q",
            len(d.Body),
            d.DeliveryTag,
            d.Body,
        )
        d.Ack(false)
    }
    log.Printf("handle: deliveries channel closed")
    done <- nil
}

载goblog/accountservice/main.go main函数中添加新行, 为咱们启动一个AMQP消费者:

func main() {
    fmt.Printf("Starting %v\n", appName)

    config.LoadConfigurationFromBranch(
            viper.GetString("configServerUrl"),
            appName,
            viper.GetString("profile"),
            viper.GetString("configBranch"))
    initializeBoltClient()
    
    // NEW
    go config.StartListener(appName, viper.GetString("amqp_server_url"), viper.GetString("config_event_bus"))   
    service.StartWebServer(viper.GetString("server_port"))
}

注意上面的StartListener的两个参数服务器url和事件bus两个属性,它们是在下面的文件中定义的:

server_port: 6767
server_name: Accountservice TEST
the_password: '{cipher}AQB1BMFC....'
amqp_server_url: amqp://guest:guest@rabbitmq:5672/
config_event_bus: springCloudBus
func StartListener(appName string, amqpServer string, exchangeName string) {
    err := NewConsumer(amqpServer, exchangeName, "topic", "config-event-queue", exchangeName, appName)
    if err != nil {
        log.Fatalf("%s", err)
    }

    log.Printf("running forever")
    select {}   // Yet another way to stop a Goroutine from finishing...
}

NewConsumer是样板代码的实际位置,这里先忽略过它,直接看看实际处理进来请求的代码:

func handleRefreshEvent(body []byte, consumerTag string) {
    updateToken := &UpdateToken{}
    err := json.Unmarshal(body, updateToken)
    if err != nil {
        log.Printf("Problem parsing UpdateToken: %v", err.Error())
    } else {
        if strings.Contains(updateToken.DestinationService, consumerTag) {
            log.Println("Reloading Viper config from Spring Cloud Config server")

            // Consumertag is same as application name.
            LoadConfigurationFromBranch(
                viper.GetString("configServerUrl"),
                consumerTag,
                viper.GetString("profile"),
                viper.GetString("configBranch"))
        }
    }
}

// {"type":"RefreshRemoteApplicationEvent","timestamp":1494514362123,"originService":"config-server:docker:8888","destinationService":"xxxaccoun:**","id":"53e61c71-cbae-4b6d-84bb-d0dcc0aeb4dc"}
type UpdateToken struct {
    Type string `json:"type"`
    Timestamp int `json:"timestamp"`
    OriginService string `json:"originService"`
    DestinationService string `json:"destinationService"`
    Id string `json:"id"`
}

这个代码尝试解析到达的消息为UpdateToken结构体,而且若是destinationService匹配咱们的consumerTag(也就是 appName accountservice), 咱们就调用一样的最初服务启动时调用的LoadConfigurationFromBranch函数。

请注意在实际场景中,NewConsumer函数和通常的消息处理代码将须要更多的错误处理、确保只处理恰当的消息等等工做。

单元测试

让咱们为handleRefreshEvent()函数写一个单元测试。 建立一个新的测试文件:

var SERVICE_NAME = "accountservice"

func TestHandleRefreshEvent(t *testing.T) {
    // Configure initial viper values
    viper.Set("configServerUrl", "http://configserver:8888")
    viper.Set("profile", "test")
    viper.Set("configBranch", "master")

    // Mock the expected outgoing request for new config
    defer gock.Off()
    gock.New("http://configserver:8888").
        Get("/accountservice/test/master").
        Reply(200).
        BodyString(`{"name":"accountservice-test","profiles":["test"],"label":null,"version":null,"propertySources":[{"name":"file:/config-repo/accountservice-test.yml","source":{"server_port":6767,"server_name":"Accountservice RELOADED"}}]}`)

Convey("Given a refresh event received, targeting our application", t, func() {
        var body = `{"type":"RefreshRemoteApplicationEvent","timestamp":1494514362123,"originService":"config-server:docker:8888","destinationService":"accountservice:**","id":"53e61c71-cbae-4b6d-84bb-d0dcc0aeb4dc"}
`
        Convey("When handled", func() {
            handleRefreshEvent([]byte(body), SERVICE_NAME)

            Convey("Then Viper should have been re-populated with values from Source", func() {
                So(viper.GetString("server_name"), ShouldEqual, "Accountservice RELOADED")
            })
        })
    })
}

我但愿BDD样式的GoConvey传达(双关语!)测试如何工做。 注意咱们如何使用gock来拦截对外的请求新配置的HTTP请求,以及咱们预先产生的带有一些初始值的viper。

运行它

是时候测试了。 从新使用copyall.sh脚本部署服务。

检查accountservice的日志:

> docker logs -f [containerid]
Starting accountservice
... [truncated for brevity] ...
Successfully loaded configuration for service Accountservice TEST    <-- LOOK HERE!!!!
... [truncated for brevity] ...
2017/05/12 12:06:36 dialing amqp://guest:guest@rabbitmq:5672/
2017/05/12 12:06:36 got Connection, getting Channel
2017/05/12 12:06:36 got Channel, declaring Exchange (springCloudBus)
2017/05/12 12:06:36 declared Exchange, declaring Queue (config-event-queue)
2017/05/12 12:06:36 declared Queue (0 messages, 0 consumers), binding to Exchange (key 'springCloudBus')
2017/05/12 12:06:36 Queue bound to Exchange, starting Consume (consumer tag 'accountservice')
2017/05/12 12:06:36 running forever

如今对accountservice-test.yml和service name进行修改,并使用前面展现的使用monitor API POST来伪造一个提交hook:

我修改了accountservice-test.yml文件和它的service name属性,从accountservice TEST到Temporary test string, 而后推送改变。

接着,咱们使用curl来让咱们的Spring Cloud Config服务器知道这些更新:

> curl -H "X-Github-Event: push" -H "Content-Type: application/json" -X POST -d '{"commits": [{"modified": ["accountservice.yml"]}],"name":"what is this?"}' -ki http://192.168.99.100:8888/monitor

若是全部都正常工做,就会触发一个refresh token从Config服务器,咱们的accountservice就会捡起它. 再次检查下log:

> docker logs -f [containerid]
2017/05/12 12:13:22 got 195B consumer: [accountservice] delivery: [1] routingkey: [springCloudBus] {"type":"RefreshRemoteApplicationEvent","timestamp":1494591202057,"originService":"config-server:docker:8888","destinationService":"accountservice:**","id":"1f421f58-cdd6-44c8-b5c4-fbf1e2839baa"}
2017/05/12 12:13:22 Reloading Viper config from Spring Cloud Config server
Loading config from http://configserver:8888/accountservice/test/P8
Loading config property server_port => 6767
Loading config property server_name => Temporary test string!
Loading config property amqp_server_url => amqp://guest:guest@rabbitmq:5672/
Loading config property config_event_bus => springCloudBus
Loading config property the_password => password
Successfully loaded configuration for service Temporary test string!      <-- LOOK HERE!!!!

正如你所见的,最后一行打印了"Successfully loaded configuration for service Temporary test string!", 源代码以下:

if viper.IsSet("server_name") {
    fmt.Printf("Successfully loaded configuration for service %s\n", viper.GetString("server_name"))
}

也就是说,咱们已经动态修改了的以前存储在Viper中的属性值, 而没告诉咱们的服务!这是真正的酷!

重要提示: 虽然动态更新属性是很是酷的,可是它自己不会更新这些东西,好比咱们运行服务器的端口,池中已存在链接对象, 或RabbitMQ中间人的活动链接。 这些类型的已运行东西须要花费一些时间来使用新的配置来重启, 这些内容超出了本文的范围。

Footprint及性能

在启动时添加配置加载不该该影响运行时性能, 事实上它确实不影响。每秒1千个请求和以前具备一样的吞吐,CPU和内存使用。相信个人话或者你本身试试。咱们将在第一次启动后快速查看内存使用状况:

CONTAINER                                    CPU %               MEM USAGE / LIMIT     MEM %               NET I/O             BLOCK I/O           PIDS
accountservice.1.pi7wt0wmh2quwm8kcw4e82ay4   0.02%               4.102MiB / 1.955GiB   0.20%               18.8kB / 16.5kB     0B / 1.92MB         6
configserver.1.3joav3m6we6oimg28879gii79     0.13%               568.7MiB / 1.955GiB   28.41%              171kB / 130kB       72.9MB / 225kB      50
rabbitmq.1.kfmtsqp5fnw576btraq19qel9         0.19%               125.5MiB / 1.955GiB   6.27%               6.2MB / 5.18MB      31MB / 414kB        75
quotes-service.1.q81deqxl50n3xmj0gw29mp7jy   0.05%               340.1MiB / 1.955GiB   16.99%              2.97kB / 0B         48.1MB / 0B         30

甚至和AMQP、Viper做为配置框架的集成,咱们最初运行信息大概4MB左右。咱们的Spring Boot实现的配置服务器使用了超过500MB的内存,而RabbitMQ(我认为是用Erlang写的)使用125MB。

我能够确定的是,咱们可使用一些标准的JVM -xmx参数可让配置服务器的尺寸降低到256MB初始化堆尺寸,可是它绝对是须要大量RAM的。然而,在生产环境中我但愿运行2个配置服务器,而非几十个或几百个。 当谈及支持服务,从Spring Cloud生态,内存使用并非什么大事,由于咱们一般不会有这种服务的多余一个或几个实例。

备忘

// 查找须要加入的swarm的token, 须要在Leader中查询。
docker swarm join-token -q worker

// 以worker节点的形式加入Swarm
docker swarm join --token tokenstring worker ip:port
docker swarm join --token tokenstring manager ip:port

// ssh到具体的机器
docker-machine ssh docker-name # swarm-manager-1

总结

在本文中咱们部署了一个Spring Cloud配置服务器,和它的RabbitMQ依赖到咱们的Swarm中。而后咱们写了一些Go代码,使用一些简单的HTTP, JSON和Viper框架从配置服务器启动时加载配置并填充到Viper中,方便咱们整个微服务代码方便使用。

在下一节中,咱们会继续探索AMQP和RabbitMQ, 深刻更多细节,看看咱们本身如何发送一些消息。

参考链接

相关文章
相关标签/搜索