在微服务架构下,原单体服务被拆分为多个微服务独立部署,客户端就没法知晓服务的具体位置;并且服务数量太多,维护如此多的服务地址,运维人员也没法高效工做。git
所以,在微服务架构中引入了服务注册中心,用于接受和维护各个服务的地址信息。客户端或者网关能够经过注册中心查询目标服务地址,动态实现服务访问,而且在此实现服务负载均衡。github
对于服务注册与发现,go-kit默认提供了对consul、zookeeper、etcd、eureka经常使用注册中心的支持。docker
本文将基于consul,使用“客户端发现模式”进行实战演练,主要有如下要点:json
本文实例程序采用的思路为:算术服务注册至consul,其余部分保持不变;发现服务对外暴露http接口,接受请求后(接收请求内容存储在Body中,以json方式传递),按照go-kit的机制动态查询算术服务实例,调用算术服务的接口,而后将响应内容返回。以下图所示:bootstrap
docker/docker-compose.yml
,以下所示(暂时注释了Prometheus和Grafana的部分)。version: '2'
services:
consul:
image: progrium/consul:latest
ports:
- 8400:8400
- 8500:8500
- 8600:53/udp
hostname: consulserver
command: -server -bootstrap -ui-dir /ui
复制代码
sudo docker-compose -f docker/docker-compose.yml up
复制代码
http://localhost:8500
,出现如下界面即为启动成功。本示例基于arithmetic_monitor_demo
代码进行改写。首先,复制该目录并重命名为arithmetic_consul_demo
;新建两个目录,分别命名为register
、discover
;将原有go
代码文件移动至register
目录。结果以下图所示:api
另外,须要下载所依赖的第三方库uuid
和hashicorp/consul
浏览器
go get github.com/pborman/uuid
go get github.com/hashicorp/consul
复制代码
新建register/register.go
,添加Register
方法,实现向consul的注册逻辑。该方法接收5个参数,分别是注册中心consul的ip、端口,算术服务的本地ip和端口,日志记录工具。bash
建立注册对象须要使用hashicorp/consul
,查看代码可知其方法定义以下:微信
func NewRegistrar(client Client, r *stdconsul.AgentServiceRegistration, logger log.Logger) *Registrar
复制代码
因此Register
的实现过程主要有三步:建立consul客户端对象;建立consul对算术服务健康检查的参数配置信息;建立算术服务向consul注册的服务配置信息。代码以下:架构
func Register(consulHost, consulPort, svcHost, svcPort string, logger log.Logger) (registar sd.Registrar) {
// 建立Consul客户端链接
var client consul.Client
{
consulCfg := api.DefaultConfig()
consulCfg.Address = consulHost + ":" + consulPort
consulClient, err := api.NewClient(consulCfg)
if err != nil {
logger.Log("create consul client error:", err)
os.Exit(1)
}
client = consul.NewClient(consulClient)
}
// 设置Consul对服务健康检查的参数
check := api.AgentServiceCheck{
HTTP: "http://" + svcHost + ":" + svcPort + "/health",
Interval: "10s",
Timeout: "1s",
Notes: "Consul check service health status.",
}
port, _ := strconv.Atoi(svcPort)
//设置微服务想Consul的注册信息
reg := api.AgentServiceRegistration{
ID: "arithmetic" + uuid.New(),
Name: "arithmetic",
Address: svcHost,
Port: port,
Tags: []string{"arithmetic", "raysonxin"},
Check: &check,
}
// 执行注册
registar = consul.NewRegistrar(client, ®, logger)
return
}
复制代码
由Step-2
可知,consul将定时请求算术服务的/heath
用于检查服务的健康状态,因此咱们将从service
、endpoint
、transport
中增长对应的实现。
Service
中新增接口方法HealthCheck
,并依次在ArithmeticService
、loggingMiddleware
、metricMiddleware
中添加实现。// service接口
// Service Define a service interface
type Service interface {
//省略以前的其余方法
// HealthCheck check service health status
HealthCheck() bool
}
// ArithmeticService实现HealthCheck
// HealthCheck implement Service method
// 用于检查服务的健康状态,这里仅仅返回true。
func (s ArithmeticService) HealthCheck() bool {
return true
}
// loggingMiddleware实现HealthCheck
func (mw loggingMiddleware) HealthCheck() (result bool) {
defer func(begin time.Time) {
mw.logger.Log(
"function", "HealthChcek",
"result", result,
"took", time.Since(begin),
)
}(time.Now())
result = mw.Service.HealthCheck()
return
}
// metricMiddleware实现HealthCheck
func (mw metricMiddleware) HealthCheck() (result bool) {
defer func(begin time.Time) {
lvs := []string{"method", "HealthCheck"}
mw.requestCount.With(lvs...).Add(1)
mw.requestLatency.With(lvs...).Observe(time.Since(begin).Seconds())
}(time.Now())
result = mw.Service.HealthCheck()
return
}
复制代码
endpoints.go
中新增结构:ArithmeticEndpoints
。在以前的示例中,仅使用了一个endpoint,因此我直接使用告终构endpoint.Endpoint
。定义以下:// ArithmeticEndpoint define endpoint
type ArithmeticEndpoints struct {
ArithmeticEndpoint endpoint.Endpoint
HealthCheckEndpoint endpoint.Endpoint
}
复制代码
endpoint.Endpoint
封装方法。代码以下:// HealthRequest 健康检查请求结构
type HealthRequest struct{}
// HealthResponse 健康检查响应结构
type HealthResponse struct {
Status bool `json:"status"`
}
// MakeHealthCheckEndpoint 建立健康检查Endpoint
func MakeHealthCheckEndpoint(svc Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (response interface{}, err error) {
status := svc.HealthCheck()
return HealthResponse{status}, nil
}
}
复制代码
transports.go
中新增健康检查接口/health
。// MakeHttpHandler make http handler use mux
func MakeHttpHandler(ctx context.Context, endpoints ArithmeticEndpoints, logger log.Logger) http.Handler {
r := mux.NewRouter()
//省略原有/calculate/{type}/{a}/{b}代码
// create health check handler
r.Methods("GET").Path("/health").Handler(kithttp.NewServer(
endpoints.HealthCheckEndpoint,
decodeHealthCheckRequest,
encodeArithmeticResponse,
options...,
))
return r
}
复制代码
接下来在main.go
中增长健康检查和服务注册相关的调用代码,以便上述修改逻辑生效。
//建立健康检查的Endpoint,未增长限流
healthEndpoint := MakeHealthCheckEndpoint(svc)
//把算术运算Endpoint和健康检查Endpoint封装至ArithmeticEndpoints
endpts := ArithmeticEndpoints{
ArithmeticEndpoint: endpoint,
HealthCheckEndpoint: healthEndpoint,
}
//建立http.Handler
r := MakeHttpHandler(ctx, endpts, logger)
复制代码
// 定义环境变量
var (
consulHost = flag.String("consul.host", "", "consul ip address")
consulPort = flag.String("consul.port", "", "consul port")
serviceHost = flag.String("service.host", "", "service ip address")
servicePort = flag.String("service.port", "", "service port")
)
// parse
flag.Parse()
// ...
//建立注册对象
registar := Register(*consulHost, *consulPort, *serviceHost, *servicePort, logger)
go func() {
fmt.Println("Http Server start at port:" + *servicePort)
//启动前执行注册
registar.Register()
handler := r
errChan <- http.ListenAndServe(":"+*servicePort, handler)
}()
go func() {
c := make(chan os.Signal, 1)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
errChan <- fmt.Errorf("%s", <-c)
}()
error := <-errChan
//服务退出,取消注册
registar.Deregister()
fmt.Println(error)
复制代码
打开终端,切换至项目目录。执行go build ./register
编译成功后,输入如下指令启动算术服务(注册服务):
./register -consul.host localhost -consul.port 8500 -service.host 192.168.192.145 -service.port 9000
复制代码
启动成功后,再次刷新consul-ui
界面,看到以下界面即说明算术服务成功注册至consul。
同时也能够在注册服务运行的终端看到consul定时调用/health
接口的日志输出信息:
discover
服务要完成的工做为:以REST接口/calculate
对外提供API服务,客户端使用HTTP POST方法发送json数据执行请求;在endpoint中查询已经在consul中注册的服务实例;而后选择合适的服务实例向其发起请求转发;完成请求后向原客户端请求响应。
查阅go-kit源码可知,kit/sd/Endpointer
提供了一套服务发现机制,其定义和建立接口以下所示:
// Endpointer listens to a service discovery system and yields a set of
// identical endpoints on demand. An error indicates a problem with connectivity
// to the service discovery system, or within the system itself; an Endpointer
// may yield no endpoints without error.
type Endpointer interface {
Endpoints() ([]endpoint.Endpoint, error)
}
// NewEndpointer creates an Endpointer that subscribes to updates from Instancer src
// and uses factory f to create Endpoints. If src notifies of an error, the Endpointer
// keeps returning previously created Endpoints assuming they are still good, unless
// this behavior is disabled via InvalidateOnError option.
func NewEndpointer(src Instancer, f Factory, logger log.Logger, options ...EndpointerOption) *DefaultEndpointer
复制代码
经过代码注释咱们能够知道: Endpointer经过监听服务发现系统的事件信息,而且经过factory按需建立服务终结点(Endpoint
)。
因此,咱们须要经过Endpointer
来实现服务发现功能。在微服务模式下,同一个服务可能存在多个实例,因此须要经过负载均衡机制完成实例选择,这里使用go-kit工具集中的kit/sd/lb
组件(该组件实现RoundRibbon,并具有Retry功能)。
在discover
目录中建立go文件factory.go
,实现sd.Factory
的逻辑,即把服务实例转换为endpoint,在该endpoint中实现对于目标服务的调用过程。这里直接针对算术运算服务进行封装,代码以下所示:
func arithmeticFactory(_ context.Context, method, path string) sd.Factory {
return func(instance string) (endpoint endpoint.Endpoint, closer io.Closer, err error) {
if !strings.HasPrefix(instance, "http") {
instance = "http://" + instance
}
tgt, err := url.Parse(instance)
if err != nil {
return nil, nil, err
}
tgt.Path = path
var (
enc kithttp.EncodeRequestFunc
dec kithttp.DecodeResponseFunc
)
enc, dec = encodeArithmeticRequest, decodeArithmeticReponse
return kithttp.NewClient(method, tgt, enc, dec).Endpoint(), nil, nil
}
}
func encodeArithmeticRequest(_ context.Context, req *http.Request, request interface{}) error {
arithReq := request.(ArithmeticRequest)
p := "/" + arithReq.RequestType + "/" + strconv.Itoa(arithReq.A) + "/" + strconv.Itoa(arithReq.B)
req.URL.Path += p
return nil
}
func decodeArithmeticReponse(_ context.Context, resp *http.Response) (interface{}, error) {
var response ArithmeticResponse
var s map[string]interface{}
if respCode := resp.StatusCode; respCode >= 400 {
if err := json.NewDecoder(resp.Body).Decode(&s); err != nil {
return nil, err
}
return nil, errors.New(s["error"].(string) + "\n")
}
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
return nil, err
}
return response, nil
}
复制代码
建立go文件discover/enpoints.go
。根据上述分析,在该endpoint实现对服务发现系统的监听,实现实例选择,最终返回可执行的endpoint.Endpoint
。下面根据代码注释说明实现过程:
// MakeDiscoverEndpoint 使用consul.Client建立服务发现Endpoint
// 为了方便这里默认了一些参数
func MakeDiscoverEndpoint(ctx context.Context, client consul.Client, logger log.Logger) endpoint.Endpoint {
serviceName := "arithmetic"
tags := []string{"arithmetic", "raysonxin"}
passingOnly := true
duration := 500 * time.Millisecond
//基于consul客户端、服务名称、服务标签等信息,
// 建立consul的链接实例,
// 可实时查询服务实例的状态信息
instancer := consul.NewInstancer(client, logger, serviceName, tags, passingOnly)
//针对calculate接口建立sd.Factory
factory := arithmeticFactory(ctx, "POST", "calculate")
//使用consul链接实例(发现服务系统)、factory建立sd.Factory
endpointer := sd.NewEndpointer(instancer, factory, logger)
//建立RoundRibbon负载均衡器
balancer := lb.NewRoundRobin(endpointer)
//为负载均衡器增长重试功能,同时该对象为endpoint.Endpoint
retry := lb.Retry(1, duration, balancer)
return retry
}
复制代码
建立go文件discover/transports.go
。经过mux/Router
使用POST方法为发现服务开放REST接口/calculate
,与算术服务同样,这里须要endpoint.Endpoint
、DecodeRequestFunc
、EncodeResponseFunc
。为了方便,我把算术服务中的请求与响应结构和编解码方法直接复制过来。代码以下所示:
func MakeHttpHandler(endpoint endpoint.Endpoint) http.Handler {
r := mux.NewRouter()
r.Methods("POST").Path("/calculate").Handler(kithttp.NewServer(
endpoint,
decodeDiscoverRequest,
encodeDiscoverResponse,
))
return r
}
// 省略实体结构和编解码方法
复制代码
接下来就是在main方法把以上逻辑串起来,而后启动发现服务了,这里监听端口为9001。比较简单,直接贴代码了:
func main() {
// 建立环境变量
var (
consulHost = flag.String("consul.host", "", "consul server ip address")
consulPort = flag.String("consul.port", "", "consul server port")
)
flag.Parse()
//建立日志组件
var logger log.Logger
{
logger = log.NewLogfmtLogger(os.Stderr)
logger = log.With(logger, "ts", log.DefaultTimestampUTC)
logger = log.With(logger, "caller", log.DefaultCaller)
}
//建立consul客户端对象
var client consul.Client
{
consulConfig := api.DefaultConfig()
consulConfig.Address = "http://" + *consulHost + ":" + *consulPort
consulClient, err := api.NewClient(consulConfig)
if err != nil {
logger.Log("err", err)
os.Exit(1)
}
client = consul.NewClient(consulClient)
}
ctx := context.Background()
//建立Endpoint
discoverEndpoint := MakeDiscoverEndpoint(ctx, client, logger)
//建立传输层
r := MakeHttpHandler(discoverEndpoint)
errc := make(chan error)
go func() {
c := make(chan os.Signal)
signal.Notify(c, syscall.SIGINT, syscall.SIGTERM)
errc <- fmt.Errorf("%s", <-c)
}()
//开始监听
go func() {
logger.Log("transport", "HTTP", "addr", "9001")
errc <- http.ListenAndServe(":9001", r)
}()
// 开始运行,等待结束
logger.Log("exit", <-errc)
}
复制代码
在终端中切换至discover
目录,执行go build
完成编译,而后使用如下命令(指定注册中心服务地址)启动发现服务:
./discover -consul.host localhost -consul.port 8500
复制代码
使用postman请求http://localhost:9001/calculate
,在body
中设置请求信息,完成测试。以下图所示:
本文使用consul做为注册中心,经过实例演示了go-kit的服务注册与发现功能。因为本人在这个部分了解不够透彻,在编写代码和本文的过程当中,一直在研究go-kit发现组件的设计方式,力求可以经过代码、文字解释清楚。本人水平有限,有任何错误或不妥之处,请你们批评指正。
本文实例代码见arithmetic_consul_demo。
本文首发于本人微信公众号【兮一昂吧】,欢迎扫码关注!