战旗直播基于consul服务注册与发现的GRPC服务实战与感想

前言

鄙人关注consul也有一段时间了,从2017年开始了解它的一些特性,它能帮助解决哪些问题,而后怎么应用到微服务中去。随着时间的推移,微服务的发展也是很是的迅速,能够说突飞猛进,天天都在变化。consul工具所提供的功能也在不断地新增和完善。OK,有些扯远了,我们仍是回到主题上来吧。
在微服务领域有个重要的概念——服务注册与发现。google或baidu一下,会发现有大量的关于服务注册发现的文章、博客等,有基于consul的、也有基于etcd和zoomkeeper的,每一个工具都有本身的特色和优点,也有必定的类似性,好比它们均可以实现服务注册与发现,也均可以实现kv存储等。可是它们也有必定的区别,好比consul重点是服务注册与发现,其次是kv存储;像etcd重点是处理kv存储的功能,在kv存储上来实现服务的注册与发现。侧重点不同,感兴趣的朋友能够去google或baidu一下,这里就不在叙述了。因为侧重点不一样,依据战旗直播业务的实际状况,选择了consul来实现战旗后端的服务注册与发现。html

consul能解决的问题

  1. 服务注册(注销)与发现
  2. 节点/服务监控
  3. KV存储——业务服务配置统一管理
  4. consul-template
  5. ACL
  6. DNS
  7. 其余

consul集群部署

依据consul的文档,consul集群中须要部署两种类型的节点:server节点和client节点。server节点推荐部署奇数个节点,有利于leader的选举过程快速的结束(偶数个节点可能须要多个选举过程才能选举出leader)。这里有几个概念:集群,leader选举,对这些原理感兴趣同窗能够去google下.
战旗部署了5个server节点,其余节点都是client,也就是说每台服务上都有一个node。而后,战旗的服务经过localhost:8500地址向consul集群注册本身。部署结构1以下图所示:
Consul dep.png前端

思考:这样的部署结构存在必定的缺陷,若是某个节点的consul挂了,会直接影响该节点上的全部应用服务,应为它们都是经过localhost来跟consul进行通信的。那怎么预防这样的状况呢?
这时备选方案plan B出台了.node

  1. 经过coreDNS获取consul集群情况,来自动配置一个内部DNS路由条目。当访问consul服务时,自动路由到consul集群中的某个节点。
  2. 也能够部署两个nginx(主备),由nginx路由到consul集群中的某个节点。
  3. 若是您采用的是k8s环境,那恭喜您,能够省略不少工做来,直接在k8s中部署一个service来指向consul集群。

部署结构2以下图所示:
consul dep2.jpgnginx

小结:为了提升consul集群的高可靠性,保证服务的正常运行。在结构1的基础上,同时又部署了DNS方案。当本节点的consul不可达时,经过DNS来与consul集群通信。这里可能有些同窗会问,为何不直接采用结构2呢?这两个结构各有千秋,结构1通迅效率高,应用服务的健康检测都在当前节点完成,检测压力小。结构2虽无单节点风险,但存在consul负载不均衡等缺点。git

具体consul怎么下载,怎么安装,怎么启动server,怎么加入集群就不讲了,你们能够参考官方文档:https://www.consul.io/docs/index.htmlgithub

consul与grpc实战

服务注册与注销

当应用服务启动时,将本身注册到consul中,以便其余服务能及时发现该服务。后端

1.导入sdk,api

import (
    consul "github.com/hashicorp/consul/api"
)

2.获取consul client对象restful

// @param uriStr string: consul通迅地址,默认http://localhost:8500; 当localhost不可达时,须要经过服务名来访问其余consul节点,如:http://consul:8500
// @param token string: 访问控制用
func MakeClient(uriStr string, token string) (*consul.Client, error) {

    uri, err := url.Parse(uriStr)
    if err != nil {
        logs.Error("url parse error: ", err)
        return nil, err
    }

    config := consulapi.DefaultConfig()
    config.Address = uriStr

    if len(token) > 0 {
        config.Token = token
    } else {
        config.Token = defaultToken
    }

    client, err := consulapi.NewClient(config)
    if err != nil {
        logs.Error("consul: ", uri.Scheme)
        return nil, err
    }

    return client, nil
}

3.准备工做框架

// @param svrName string: 要注册当服务名
// @param useType string: 对应consul中的tag, 可用于过滤
// @param svrPort int: 服务对应的端口号
// @param healthPort int: http检测时须要对应端口号,tcp检测默认当前端口
// @param healthType string: http或tcp,
// @param localIp string: 当前节点的内网IP,即其余服务能访问到的IP
func Prepare(svrName string, useType string, svrPort int, healthPort int, healthType string, localIp string) *consulapi.AgentServiceRegistration {
    ip := localIp 
    // 注册配置信息
    reg := &consul.AgentServiceRegistration{
        ID:      strings.ToLower(fmt.Sprintf("%s_%d", svrName, libs.Ip2Long(ip))), // 生成一个惟一当服务ID
        Name:    strings.ToLower(fmt.Sprintf("%s", svrName)), // 注册服务名
        Tags:    []string{strings.ToLower(useType)},// 标签
        Port:    svrPort, // 端口号
        Address: ip, // 所在节点ip地址
    }
    // 健康检测配置信息
    reg.Check = &consulapi.AgentServiceCheck{
        TCP:                            fmt.Sprintf("%s:%d", ip, svrPort),
        Timeout:                        "1s",
        Interval:                       "15s",
        DeregisterCriticalServiceAfter: "30s",// 30秒服务不可达时,注销服务
        Status:                         "passing",// 服务启动时,默认正常
    }

    if healthType == "http" {
        reg.Check.HTTP = fmt.Sprintf("http://%s:%d%s", reg.Address, healthPort, "/health") // http检测默认/health路径
        reg.Check.TCP = ""
    }
    // 启动http健康检测响应
    if len(reg.Check.HTTP) > 0 {
        RunHealthCheck(reg.Check.HTTP)
    }
    p.curRegistration = reg
    return reg
}

func RunHealthCheck(addr string) error {
    uri, err := url.Parse(addr)
    if err != nil {
        return err
    }

    http.HandleFunc(uri.Path, func(w http.ResponseWriter, r *http.Request) {
        w.Write([]byte("success"))
    })
    go http.ListenAndServe(uri.Host, nil)
    return nil
}

4.服务注册

var (
    // 获取consul.Client, 若是localhost不可达须要自动切换地址
    client := MakeClient("http:localhost:8500", "")
    // 配置要注册的服务信息
    registration = Prepare("ImSvr", "grpc", "10000", 30000, "http", "192.168.1.100")
)

// 
func Register() error {
    err := client.Agent().ServiceRegister(registration)
    if err != nil {
        return err
    }

    return nil
}

5.服务注销

//
func Deregister() error {
    svrId := registration.ID
    return client.Agent().ServiceDeregister(svrId)
}

小结:当这些服务注册/注销方法封装好后,在应用服务启动的时候,调用Register()方法进行注册;当应用服务退出的时候,调用Deregister()方法进行注销。当服务异常退出当时候,并不会调用Deregister()方法,那怎么办呢?放心,前面已经有说到健康检测的DeregisterCriticalServiceAfter字段,当服务不可达时,会自动注销服务。OK,到这里服务的自动化注册已经完成了。

服务发现

对于服务与服务之间的通迅有不少方式,有人直接用tcp/http;有人会考虑restfullapi,让接口处理起来更容易;有人用dubbo,抱紧ali大腿;我们用grpc, 抱紧google大腿。通迅框架之间各有千秋,总之适合本身的才是最好的。感兴趣的盆友本身去google/百度。

google grpc框架里并无实现如何基于consul进行服务发现(或许后期会加上),不过有DNS的服务发现。看了它的实现方式,大体懂了实现原理。OK,我们就写个基于consul的服务发现。鄙人github中可直接用consulresolver(欢迎你们关注,若是有问题请提交issue会第一时间修改),目前只有roundrobin策略,后续会逐渐加入其余策略(如随机,权重随机,负载策略等).同时该项目
github.com/generalzgd/grpc-svr-frame
还对grpc做了一些简单的封装,有利于快速搭建grpc服务。
这里就不贴代码了。

OK,基于consul的服务发现有了,那怎么使用呢?
1.导入模块,会自动执行包中的Init方法

import (
    grpclb_consul `github.com/generalzgd/grpc-svr-frame/grpc-consul`
    ctrl `github.com/generalzgd/grpc-svr-frame/grpc-ctrl`
)

2.组合grpc的方法封装

type MyServer {
    ctrl.GrpcController
}

var (
    mySvr = MyServer{
        GrpcController:ctrl.MakeGrpcController(),
    }
)

func (p *MyServer) callSample(){
    // 设置5秒超时
    ctx, _ := context.WithTimeout(context.Background(), 5*time.Second)
    // 注意address字段,由“consul:///”字符串开头,表示要用consulresolver来解析,后面写上服务名,会自动解析到对应的服务地址。能够细看下对应的文件,而后理解是怎么运行的。
    cfg = yaml.EndpointConfig{
        Name: "ImSvr",
        Address: "consul:///imsvr", 
    }
    // 里面还封装了connection pool,以提升通迅效率
    clientConn,err := p.GetGrpcConnWithLB(cfg, ctx)
    if err != nil {
        return
    }
    // 如下是grpc框架的通用伪代码
    client := NewImSvrClient(clientConn.ClientConn)
    resp, err := client.Login(ctx, &LoginReq{})
    if err != nil {
        return
    }
    logs.Info("Got:", resp)
}

总结

至此基于consul的服务注册与发现已经整合到grpc通迅框架中,而且封装了grpc服务的一些经常使用方法,可用于快速的开发微服务。这个整合过程历经了不少辛酸,不只分析了官方文档源代码,也吸取了其余在consul/grpc方面的贡献者经验,前先后后琢磨尝试了许许多多的失败,也走了不少的弯路。固然了,整个结构仍是个雏形,还须要完善。例如,均衡负载策略,目前只实现了roundrobin, 后续还有随机、权重、负载策略等。

遇到的坑

我们在部署consul的时候选择了一个老版本,觉得老版本(1.4.4)会相对稳定些。在新加节点的时候,忽然已有的services每隔几分钟会消失,而后又重现,而后又消失不停的重复。妈呀,出大问题了,跟运维老哥一块儿折腾了个吧小时,推测可能join server节点的时候,数据同步可能存在问题。期间也没作其余操做,就一个节点一个节点leave回退,发现leave最后加入的server节点后consul稳定了。稳定后再仔细分析缘由,发现这个版本有bug,https://github.com/hashicorp/consul/issues/5518, 之后生产环境尽可能用次新版本。这是血的教训!!!!!

相关文章
相关标签/搜索