鄙人关注consul也有一段时间了,从2017年开始了解它的一些特性,它能帮助解决哪些问题,而后怎么应用到微服务中去。随着时间的推移,微服务的发展也是很是的迅速,能够说突飞猛进,天天都在变化。consul工具所提供的功能也在不断地新增和完善。OK,有些扯远了,我们仍是回到主题上来吧。
在微服务领域有个重要的概念——服务注册与发现。google或baidu一下,会发现有大量的关于服务注册发现的文章、博客等,有基于consul的、也有基于etcd和zoomkeeper的,每一个工具都有本身的特色和优点,也有必定的类似性,好比它们均可以实现服务注册与发现,也均可以实现kv存储等。可是它们也有必定的区别,好比consul重点是服务注册与发现,其次是kv存储;像etcd重点是处理kv存储的功能,在kv存储上来实现服务的注册与发现。侧重点不同,感兴趣的朋友能够去google或baidu一下,这里就不在叙述了。因为侧重点不一样,依据战旗直播业务的实际状况,选择了consul来实现战旗后端的服务注册与发现。html
依据consul的文档,consul集群中须要部署两种类型的节点:server节点和client节点。server节点推荐部署奇数个节点,有利于leader的选举过程快速的结束(偶数个节点可能须要多个选举过程才能选举出leader)。这里有几个概念:集群,leader选举,对这些原理感兴趣同窗能够去google下.
战旗部署了5个server节点,其余节点都是client,也就是说每台服务上都有一个node。而后,战旗的服务经过localhost:8500地址向consul集群注册本身。部署结构1以下图所示:前端
思考:这样的部署结构存在必定的缺陷,若是某个节点的consul挂了,会直接影响该节点上的全部应用服务,应为它们都是经过localhost来跟consul进行通信的。那怎么预防这样的状况呢?
这时备选方案plan B出台了.node
部署结构2以下图所示:nginx
小结:为了提升consul集群的高可靠性,保证服务的正常运行。在结构1的基础上,同时又部署了DNS方案。当本节点的consul不可达时,经过DNS来与consul集群通信。这里可能有些同窗会问,为何不直接采用结构2呢?这两个结构各有千秋,结构1通迅效率高,应用服务的健康检测都在当前节点完成,检测压力小。结构2虽无单节点风险,但存在consul负载不均衡等缺点。git
具体consul怎么下载,怎么安装,怎么启动server,怎么加入集群就不讲了,你们能够参考官方文档:https://www.consul.io/docs/index.htmlgithub
当应用服务启动时,将本身注册到consul中,以便其余服务能及时发现该服务。后端
1.导入sdk,api
import ( consul "github.com/hashicorp/consul/api" )
2.获取consul client对象restful
// @param uriStr string: consul通迅地址,默认http://localhost:8500; 当localhost不可达时,须要经过服务名来访问其余consul节点,如:http://consul:8500 // @param token string: 访问控制用 func MakeClient(uriStr string, token string) (*consul.Client, error) { uri, err := url.Parse(uriStr) if err != nil { logs.Error("url parse error: ", err) return nil, err } config := consulapi.DefaultConfig() config.Address = uriStr if len(token) > 0 { config.Token = token } else { config.Token = defaultToken } client, err := consulapi.NewClient(config) if err != nil { logs.Error("consul: ", uri.Scheme) return nil, err } return client, nil }
3.准备工做框架
// @param svrName string: 要注册当服务名 // @param useType string: 对应consul中的tag, 可用于过滤 // @param svrPort int: 服务对应的端口号 // @param healthPort int: http检测时须要对应端口号,tcp检测默认当前端口 // @param healthType string: http或tcp, // @param localIp string: 当前节点的内网IP,即其余服务能访问到的IP func Prepare(svrName string, useType string, svrPort int, healthPort int, healthType string, localIp string) *consulapi.AgentServiceRegistration { ip := localIp // 注册配置信息 reg := &consul.AgentServiceRegistration{ ID: strings.ToLower(fmt.Sprintf("%s_%d", svrName, libs.Ip2Long(ip))), // 生成一个惟一当服务ID Name: strings.ToLower(fmt.Sprintf("%s", svrName)), // 注册服务名 Tags: []string{strings.ToLower(useType)},// 标签 Port: svrPort, // 端口号 Address: ip, // 所在节点ip地址 } // 健康检测配置信息 reg.Check = &consulapi.AgentServiceCheck{ TCP: fmt.Sprintf("%s:%d", ip, svrPort), Timeout: "1s", Interval: "15s", DeregisterCriticalServiceAfter: "30s",// 30秒服务不可达时,注销服务 Status: "passing",// 服务启动时,默认正常 } if healthType == "http" { reg.Check.HTTP = fmt.Sprintf("http://%s:%d%s", reg.Address, healthPort, "/health") // http检测默认/health路径 reg.Check.TCP = "" } // 启动http健康检测响应 if len(reg.Check.HTTP) > 0 { RunHealthCheck(reg.Check.HTTP) } p.curRegistration = reg return reg } func RunHealthCheck(addr string) error { uri, err := url.Parse(addr) if err != nil { return err } http.HandleFunc(uri.Path, func(w http.ResponseWriter, r *http.Request) { w.Write([]byte("success")) }) go http.ListenAndServe(uri.Host, nil) return nil }
4.服务注册
var ( // 获取consul.Client, 若是localhost不可达须要自动切换地址 client := MakeClient("http:localhost:8500", "") // 配置要注册的服务信息 registration = Prepare("ImSvr", "grpc", "10000", 30000, "http", "192.168.1.100") ) // func Register() error { err := client.Agent().ServiceRegister(registration) if err != nil { return err } return nil }
5.服务注销
// func Deregister() error { svrId := registration.ID return client.Agent().ServiceDeregister(svrId) }
小结:当这些服务注册/注销方法封装好后,在应用服务启动的时候,调用Register()方法进行注册;当应用服务退出的时候,调用Deregister()方法进行注销。当服务异常退出当时候,并不会调用Deregister()方法,那怎么办呢?放心,前面已经有说到健康检测的DeregisterCriticalServiceAfter字段,当服务不可达时,会自动注销服务。OK,到这里服务的自动化注册已经完成了。
对于服务与服务之间的通迅有不少方式,有人直接用tcp/http;有人会考虑restfullapi,让接口处理起来更容易;有人用dubbo,抱紧ali大腿;我们用grpc, 抱紧google大腿。通迅框架之间各有千秋,总之适合本身的才是最好的。感兴趣的盆友本身去google/百度。
google grpc框架里并无实现如何基于consul进行服务发现(或许后期会加上),不过有DNS的服务发现。看了它的实现方式,大体懂了实现原理。OK,我们就写个基于consul的服务发现。鄙人github中可直接用consulresolver(欢迎你们关注,若是有问题请提交issue会第一时间修改),目前只有roundrobin策略,后续会逐渐加入其余策略(如随机,权重随机,负载策略等).同时该项目
github.com/generalzgd/grpc-svr-frame
还对grpc做了一些简单的封装,有利于快速搭建grpc服务。
这里就不贴代码了。
OK,基于consul的服务发现有了,那怎么使用呢?
1.导入模块,会自动执行包中的Init方法
import ( grpclb_consul `github.com/generalzgd/grpc-svr-frame/grpc-consul` ctrl `github.com/generalzgd/grpc-svr-frame/grpc-ctrl` )
2.组合grpc的方法封装
type MyServer { ctrl.GrpcController } var ( mySvr = MyServer{ GrpcController:ctrl.MakeGrpcController(), } ) func (p *MyServer) callSample(){ // 设置5秒超时 ctx, _ := context.WithTimeout(context.Background(), 5*time.Second) // 注意address字段,由“consul:///”字符串开头,表示要用consulresolver来解析,后面写上服务名,会自动解析到对应的服务地址。能够细看下对应的文件,而后理解是怎么运行的。 cfg = yaml.EndpointConfig{ Name: "ImSvr", Address: "consul:///imsvr", } // 里面还封装了connection pool,以提升通迅效率 clientConn,err := p.GetGrpcConnWithLB(cfg, ctx) if err != nil { return } // 如下是grpc框架的通用伪代码 client := NewImSvrClient(clientConn.ClientConn) resp, err := client.Login(ctx, &LoginReq{}) if err != nil { return } logs.Info("Got:", resp) }
至此基于consul的服务注册与发现已经整合到grpc通迅框架中,而且封装了grpc服务的一些经常使用方法,可用于快速的开发微服务。这个整合过程历经了不少辛酸,不只分析了官方文档源代码,也吸取了其余在consul/grpc方面的贡献者经验,前先后后琢磨尝试了许许多多的失败,也走了不少的弯路。固然了,整个结构仍是个雏形,还须要完善。例如,均衡负载策略,目前只实现了roundrobin, 后续还有随机、权重、负载策略等。
我们在部署consul的时候选择了一个老版本,觉得老版本(1.4.4)会相对稳定些。在新加节点的时候,忽然已有的services每隔几分钟会消失,而后又重现,而后又消失不停的重复。妈呀,出大问题了,跟运维老哥一块儿折腾了个吧小时,推测可能join server节点的时候,数据同步可能存在问题。期间也没作其余操做,就一个节点一个节点leave回退,发现leave最后加入的server节点后consul稳定了。稳定后再仔细分析缘由,发现这个版本有bug,https://github.com/hashicorp/consul/issues/5518, 之后生产环境尽可能用次新版本。这是血的教训!!!!!