本文做者赵化冰,将在明天下午 1 点半在成都蚂蚁 C 空间为你们分享《服务网格技术在5G网络管理平台中的落地实践》欢迎你们,查看活动详情。bootstrap
在Istio架构中,Pilot组件属于最核心的组件,负责了服务网格中的流量管理以及控制面和数据面之间的配置下发。Pilot内部的代码结构比较复杂,本文中咱们将经过对Pilot的代码的深刻分析来了解Pilot实现原理。api
首先咱们来看一下Pilot在Istio中的功能定位,Pilot将服务信息和配置数据转换为xDS接口的标准数据结构,经过gRPC下发到数据面的Envoy。若是把Pilot当作一个处理数据的黑盒,则其有两个输入,一个输出:网络
目前Pilot的输入包括两部分数据来源:数据结构
Pilot的输出为符合xDS接口的数据面配置数据,并经过gRPC Streaming接口将配置数据推送到数据面的Envoy中。架构
备注:Istio代码库在不停变化更新中,本文分析所基于的代码commit为: d539abe00c2599d80c6d64296f78d3bb8ab4b033并发
Istio Pilot的代码分为Pilot-Discovery和Pilot-Agent,其中Pilot-Agent用于在数据面负责Envoy的生命周期管理,Pilot-Discovery才是控制面进行流量管理的组件,本文将重点分析控制面部分,即Pilot-Discovery的代码。负载均衡
下图是Pilot-Discovery组件代码的主要结构: less
Pilot-Discovery的入口函数为:pilot/cmd/pilot-discovery/main.go中的main方法。main方法中建立了Discovery Server,Discovery Server中主要包含三部分逻辑:svg
Config Controller用于管理各类配置数据,包括用户建立的流量管理规则和策略。Istio目前支持三种类型的Config Controller:函数
目前Istio的配置包括:
Service Controller用于管理各类Service Registry,提出服务发现数据,目前Istio支持的Service Registry包括:
Discovery Service中主要包含下述逻辑:
Pilot-Disocvery包括如下主要的几个业务流程:
Pilot-Discovery命令的入口为pilot/cmd/pilot-discovery/main.go中的main方法,在该方法中建立Pilot Server,Server代码位于文件pilot/pkg/bootstrap/server.go中。Server主要作了下面一些初始化工做:
Pilot Server建立了一个gRPC Server,用于监听和接收来自Envoy的xDS请求。pilot/pkg/proxy/envoy/v2/ads.go 中的 DiscoveryServer.StreamAggregatedResources方法被注册为gRPC Server的服务处理方法。
当gRPC Server收到来自Envoy的链接时,会调用DiscoveryServer.StreamAggregatedResources方法,在该方法中建立一个XdsConnection对象,并开启一个goroutine从该connection中接收客户端的xDS请求并进行处理;若是控制面的配置发生变化,Pilot也会经过该connection把配置变化主动推送到Envoy端。
这是Pilot中最复杂的一个业务流程,主要是由于代码中采用了多个channel和queue对变化消息进行合并和转发。该业务流程以下:
Pilot和Envoy之间创建的是一个双向的Streaming gRPC服务调用,所以Pilot能够在配置变化时向Envoy推送,Envoy也能够主动发起xDS调用请求获取配置。Envoy主动发起xDS请求的流程以下:
下面是Discovery Server的关键代码片断和对应的业务逻辑注解,为方便阅读,代码中只保留了逻辑主干,去掉了一些不重要的细节。
该部分关键代码位于 istio.io/istio/pilot/pkg/proxy/envoy/v2/ads.go
文件的StreamAggregatedResources 方法中。StreamAggregatedResources方法被注册为gRPC Server的handler,对于每个客户端链接,gRPC Server会启动一个goroutine来进行处理。
代码中主要包含如下业务逻辑:
// StreamAggregatedResources implements the ADS interface. func (s *DiscoveryServer) StreamAggregatedResources(stream ads.AggregatedDiscoveryService_StreamAggregatedResourcesServer) error { ...... //建立一个goroutine来接收来自Envoy的xDS请求,并将请求放到reqChannel中 con := newXdsConnection(peerAddr, stream) reqChannel := make(chan *xdsapi.DiscoveryRequest, 1) go receiveThread(con, reqChannel, &receiveError) ...... for { select{ //从reqChannel接收Envoy端主动发起的xDS请求 case discReq, ok := <-reqChannel: //根据请求的类型构造相应的xDS Response并发送到Envoy端 switch discReq.TypeUrl { case ClusterType: err := s.pushCds(con, s.globalPushContext(), versionInfo()) case ListenerType: err := s.pushLds(con, s.globalPushContext(), versionInfo()) case RouteType: err := s.pushRoute(con, s.globalPushContext(), versionInfo()) case EndpointType: err := s.pushEds(s.globalPushContext(), con, versionInfo(), nil) } //从PushChannel接收Service或者Config变化后的通知 case pushEv := <-con.pushChannel: //将变化内容推送到Envoy端 err := s.pushConnection(con, pushEv) } } }
该部分关键代码位于 istio.io/istio/pilot/pkg/proxy/envoy/v2/discovery.go
文件中,用于监听服务和配置变化消息,并将变化消息合并后经过Channel发送给前面提到的 StreamAggregatedResources 方法进行处理。
ConfigUpdate是处理服务和配置变化的回调函数,service controller和config controller在发生变化时会调用该方法通知Discovery Server。
func (s *DiscoveryServer) ConfigUpdate(req *model.PushRequest) { inboundConfigUpdates.Increment() //服务或配置变化后,将一个PushRequest发送到pushChannel中 s.pushChannel <- req }
在debounce方法中将连续发生的PushRequest进行合并,若是一段时间内没有收到新的PushRequest,再发起推送;以免因为服务和配置频繁变化给系统带来较大压力。
// The debounce helper function is implemented to enable mocking func debounce(ch chan *model.PushRequest, stopCh <-chan struct{}, pushFn func(req *model.PushRequest)) { ...... pushWorker := func() { eventDelay := time.Since(startDebounce) quietTime := time.Since(lastConfigUpdateTime) // it has been too long or quiet enough //一段时间内没有收到新的PushRequest,再发起推送 if eventDelay >= DebounceMax || quietTime >= DebounceAfter { if req != nil { pushCounter++ adsLog.Infof("Push debounce stable[%d] %d: %v since last change, %v since last push, full=%v", pushCounter, debouncedEvents, quietTime, eventDelay, req.Full) free = false go push(req) req = nil debouncedEvents = 0 } } else { timeChan = time.After(DebounceAfter - quietTime) } } for { select { ...... case r := <-ch: lastConfigUpdateTime = time.Now() if debouncedEvents == 0 { timeChan = time.After(DebounceAfter) startDebounce = lastConfigUpdateTime } debouncedEvents++ //合并连续发生的多个PushRequest req = req.Merge(r) case <-timeChan: if free { pushWorker() } case <-stopCh: return } } }
ServiceMesher 社区是由一群拥有相同价值观和理念的志愿者们共同发起,于 2018 年 4 月正式成立。
社区关注领域有:容器、微服务、Service Mesh、Serverless,拥抱开源和云原生,致力于推进 Service Mesh 在中国的蓬勃发展。