heapster version: release-1.2node
Heapster是Kubernetes下的一个监控项目,用于进行容器集群的监控和性能分析。
基本的功能及概念介绍能够回顾我以前的一篇文章:《Kubernetes监控之Heapster介绍》。
随着的Heapster的版本迭代,支持的功能越愈来愈多,好比新版本支持更多的后端数据存储方式:OpenTSDB、Monasca、Kafka、Elasticsearch等等。看太低版本(如v0.18)的源码,会发现v1.2版本的源码架构彻底变了样,架构扩展性愈来愈强,源码学无止境!
上面不少介绍这篇文章并不会涉及,咱们仍是会用到最流行的模式:Heapster + InfluxDB。git
监控系统架构图:github
该图很好的描述了监控系统的关键组件,及数据流向。
在源码分析以前咱们先介绍Heapster的实现流程,由上图能够看出Heapster会从各个Node上kubelet获取相关的监控信息,而后进行汇总发送给后台数据库InfluxDB。
这里会涉及到几个关键点:golang
k8s集群会增删Nodes,Heapster须要获取这些sources并作相应的操做数据库
Heapster后端数据库怎么存储?是否支持多后端?后端
Heapster获取到数据后推送给后端数据库,那么其提供了API的数据该从何处获取?本地cache?api
Heapster从kubelet获取到的数据是否须要处理?仍是能直接存储到后端restful
等等..数据结构
一块儿分析完heapster源码实现,就能进行解惑了。架构
先列出我解析源码时所用的命令,及参数使用,便于后面的理解。
# heapster --source=kubernetes:http://<master-ip>:8080?inClusterConfig=false\&useServiceAccount=false --sink=influxdb:http://<influxdb-ip>:8086
从Heapster的启动流程开始分析其实现,前面作了简单的分析,能够带着问题去看源码会有更好的收获。
路径: heapster/metrics/heapster.go
func main() { ... // 根据--source参数的输入来建立数据源 // 咱们这里会使用kubernetes,下面会根据k8s来解析 sourceFactory := sources.NewSourceFactory() // 建立该sourceProvider时,会建立Node的ListWatch,用于监控k8s节点的增删状况,由于这些才是数据的真实来源. // 该sourceProvider会包含nodeLister,还有kubeletClient,用于跟各个节点的kubelet通讯,获取cadvisor数据 sourceProvider, err := sourceFactory.BuildAll(argSources) if err != nil { glog.Fatalf("Failed to create source provide: %v", err) } // 建立sourceManager,其实就是sourceProvider + ScrapeTimeout,用于超时获取数据 sourceManager, err := sources.NewSourceManager(sourceProvider, sources.DefaultMetricsScrapeTimeout) if err != nil { glog.Fatalf("Failed to create source manager: %v", err) } // 根据--sink建立数据存储后端 // 咱们这里会使用influxDB,来做为数据的存储后端 sinksFactory := sinks.NewSinkFactory() // 建立sinks时会返回各种对象: // metricSink: 能够理解为本地的metrics数据池,Heapster API获取到的数据都是从该对象中获取的,默认必定会建立 // sinkList: Heapster在新版本中支持多后端数据存储,好比你能够指定多个不一样的influxDB,也能够同时指定influxDB和Elasticsearch。 // historicalSource: 须要配置,咱们暂时没有用到 metricSink, sinkList, historicalSource := sinksFactory.BuildAll(argSinks, *argHistoricalSource) if metricSink == nil { glog.Fatal("Failed to create metric sink") } if historicalSource == nil && len(*argHistoricalSource) > 0 { glog.Fatal("Failed to use a sink as a historical metrics source") } for _, sink := range sinkList { glog.Infof("Starting with %s", sink.Name()) } // 建立sinkManager,会根据以前的sinkList,建立对应数量的协程,用于从sink的数据管道中获取数据,而后推送到对应的后端 sinkManager, err := sinks.NewDataSinkManager(sinkList, sinks.DefaultSinkExportDataTimeout, sinks.DefaultSinkStopTimeout) if err != nil { glog.Fatalf("Failed to created sink manager: %v", err) } // 建立对象,用于处理各个kubelet获取到的metrics数据 // 最终都会加入到dataProcessors,在最终的处理函数中会进行遍历并调用其process() metricsToAggregate := []string{ core.MetricCpuUsageRate.Name, core.MetricMemoryUsage.Name, core.MetricCpuRequest.Name, core.MetricCpuLimit.Name, core.MetricMemoryRequest.Name, core.MetricMemoryLimit.Name, } metricsToAggregateForNode := []string{ core.MetricCpuRequest.Name, core.MetricCpuLimit.Name, core.MetricMemoryRequest.Name, core.MetricMemoryLimit.Name, } // 速率计算对象 dataProcessors := []core.DataProcessor{ // Convert cumulaties to rate processors.NewRateCalculator(core.RateMetricsMapping), } kubernetesUrl, err := getKubernetesAddress(argSources) if err != nil { glog.Fatalf("Failed to get kubernetes address: %v", err) } kubeConfig, err := kube_config.GetKubeClientConfig(kubernetesUrl) if err != nil { glog.Fatalf("Failed to get client config: %v", err) } kubeClient := kube_client.NewOrDie(kubeConfig) // 会建立podLister、nodeLister、namespaceLister,用于从k8s watch各个资源的增删状况 // 防止获取数据失败 podLister, err := getPodLister(kubeClient) if err != nil { glog.Fatalf("Failed to create podLister: %v", err) } nodeLister, err := getNodeLister(kubeClient) if err != nil { glog.Fatalf("Failed to create nodeLister: %v", err) } podBasedEnricher, err := processors.NewPodBasedEnricher(podLister) if err != nil { glog.Fatalf("Failed to create PodBasedEnricher: %v", err) } dataProcessors = append(dataProcessors, podBasedEnricher) namespaceBasedEnricher, err := processors.NewNamespaceBasedEnricher(kubernetesUrl) if err != nil { glog.Fatalf("Failed to create NamespaceBasedEnricher: %v", err) } dataProcessors = append(dataProcessors, namespaceBasedEnricher) // 这里的对象append顺序会有必定的要求 // 好比Pod的有些数据须要进行containers数据的累加获得 dataProcessors = append(dataProcessors, processors.NewPodAggregator(), &processors.NamespaceAggregator{ MetricsToAggregate: metricsToAggregate, }, &processors.NodeAggregator{ MetricsToAggregate: metricsToAggregateForNode, }, &processors.ClusterAggregator{ MetricsToAggregate: metricsToAggregate, }) nodeAutoscalingEnricher, err := processors.NewNodeAutoscalingEnricher(kubernetesUrl) if err != nil { glog.Fatalf("Failed to create NodeAutoscalingEnricher: %v", err) } dataProcessors = append(dataProcessors, nodeAutoscalingEnricher) // 这是整个Heapster功能的关键处 // 根据sourceManger、sinkManager、dataProcessors来建立manager对象 manager, err := manager.NewManager(sourceManager, dataProcessors, sinkManager, *argMetricResolution, manager.DefaultScrapeOffset, manager.DefaultMaxParallelism) if err != nil { glog.Fatalf("Failed to create main manager: %v", err) } // 开始建立协程,从各个sources获取metrics数据,并通过dataProcessors的处理,而后export到各个用于后端数据存储的sinks manager.Start() // 如下的就是建立Heapster server,用于提供各种API // 经过http.mux及go-restful进行实现 // 新版的heapster还支持TLS handler := setupHandlers(metricSink, podLister, nodeLister, historicalSource) addr := fmt.Sprintf("%s:%d", *argIp, *argPort) glog.Infof("Starting heapster on port %d", *argPort) mux := http.NewServeMux() promHandler := prometheus.Handler() if len(*argTLSCertFile) > 0 && len(*argTLSKeyFile) > 0 { if len(*argTLSClientCAFile) > 0 { authPprofHandler, err := newAuthHandler(handler) if err != nil { glog.Fatalf("Failed to create authorized pprof handler: %v", err) } handler = authPprofHandler authPromHandler, err := newAuthHandler(promHandler) if err != nil { glog.Fatalf("Failed to create authorized prometheus handler: %v", err) } promHandler = authPromHandler } mux.Handle("/", handler) mux.Handle("/metrics", promHandler) healthz.InstallHandler(mux, healthzChecker(metricSink)) // If allowed users is set, then we need to enable Client Authentication if len(*argAllowedUsers) > 0 { server := &http.Server{ Addr: addr, Handler: mux, TLSConfig: &tls.Config{ClientAuth: tls.RequestClientCert}, } glog.Fatal(server.ListenAndServeTLS(*argTLSCertFile, *argTLSKeyFile)) } else { glog.Fatal(http.ListenAndServeTLS(addr, *argTLSCertFile, *argTLSKeyFile, mux)) } } else { mux.Handle("/", handler) mux.Handle("/metrics", promHandler) healthz.InstallHandler(mux, healthzChecker(metricSink)) glog.Fatal(http.ListenAndServe(addr, mux)) } }
介绍了Heapster的启动流程后,大体能明白了该启动过程分为几个关键点:
建立数据源对象
建立后端存储对象list
建立处理metrics数据的processors
建立manager,并开启数据的获取及export的协程
开启Heapster server,并支持各种API
下面进行一一介绍。
先介绍下相关的结构体,由于这才是做者的核心思想。
建立的sourceProvider是实现了MetricsSourceProvider接口的对象。
先看下MetricsSourceProvider:
type MetricsSourceProvider interface { GetMetricsSources() []MetricsSource }
每一个最终返回的对象,都须要提供GetMetricsSources(),看字面意识就能够知道就是提供全部的获取Metrics源头的接口。
咱们的参数--source=kubernetes,因此其实咱们真实返回的结构是kubeletProvider.
路径: heapster/metrics/sources/kubelet/kubelet.go
type kubeletProvider struct { // 用于从k8s获取最新的nodes信息,而后根据kubeletClient,合成各个metricSources nodeLister *cache.StoreToNodeLister // 反射 reflector *cache.Reflector // kubeletClient相关的配置,好比端口:10255 kubeletClient *KubeletClient }
结构介绍完了,看下具体的建立过程,跟kubernetes相关的关键接口是NewKubeletProvider():
func NewKubeletProvider(uri *url.URL) (MetricsSourceProvider, error) { // 建立kubernetes master及kubelet client相关的配置 kubeConfig, kubeletConfig, err := GetKubeConfigs(uri) if err != nil { return nil, err } // 建立kubeClient及kubeletClient kubeClient := kube_client.NewOrDie(kubeConfig) kubeletClient, err := NewKubeletClient(kubeletConfig) if err != nil { return nil, err } // 获取下全部的Nodes,测试下建立的client是否能正常通信 if _, err := kubeClient.Nodes().List(kube_api.ListOptions{ LabelSelector: labels.Everything(), FieldSelector: fields.Everything()}); err != nil { glog.Errorf("Failed to load nodes: %v", err) } // 监控k8s的nodes变动 // 这里会建立协程进行watch,便于后面调用nodeLister.List()列出全部的nodes。 // 该Watch的实现,须要看下apiServer中的实现,后面会进行讲解 lw := cache.NewListWatchFromClient(kubeClient, "nodes", kube_api.NamespaceAll, fields.Everything()) nodeLister := &cache.StoreToNodeLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)} reflector := cache.NewReflector(lw, &kube_api.Node{}, nodeLister.Store, time.Hour) reflector.Run() // 结构在前面介绍过 return &kubeletProvider{ nodeLister: nodeLister, reflector: reflector, kubeletClient: kubeletClient, }, nil }
该过程会涉及到较多的技术点,好比apiServer中的watch实现,reflector的使用。这里不会进行细讲,该文章主要是针对heapster的源码实现,apiServer相关的实现后面会进行单独输出。
这里须要注意的是建立了ListWath,须要关注后面哪里用到了nodeLister.List()进行nodes的获取。
前面已经提到后端数据存储会有两处,一个是metricSink,另外一个是influxdbSink。因此这里会涉及到两个结构:
type MetricSink struct { // 锁 lock sync.Mutex // 长时间存储metrics数据,默认时间是15min longStoreMetrics []string longStoreDuration time.Duration // 短期存储metrics数据,默认时间是140s shortStoreDuration time.Duration // 短时存储空间 shortStore []*core.DataBatch // 长时存储空间 longStore []*multimetricStore }
该结构就是用于heapster API调用时获取的数据源,这里会分为两种数据存储方式:长时存储和短时存储。因此集群越大时,heapster占用内存越多,须要考虑该问题如何处理或者优化。
type influxdbSink struct { // 链接后端influxDB数据库的client client influxdb_common.InfluxdbClient // 锁 sync.RWMutex c influxdb_common.InfluxdbConfig dbExists bool }
这个就是咱们配置的InfluxDB的结构,是咱们真正的数据存储后端。
开始介绍建立后端服务流程,从sinksFactory.BuildAll()接口直接入手。
路径: heapster/metrics/sinks/factory.go
func (this *SinkFactory) BuildAll(uris flags.Uris, historicalUri string) (*metricsink.MetricSink, []core.DataSink, core.HistoricalSource) { result := make([]core.DataSink, 0, len(uris)) var metric *metricsink.MetricSink var historical core.HistoricalSource // 根据传入的"--sink"参数信息,进行build // 支持多后端数据存储,会进行遍历并建立 for _, uri := range uris { // 关键接口 sink, err := this.Build(uri) if err != nil { glog.Errorf("Failed to create sink: %v", err) continue } if uri.Key == "metric" { metric = sink.(*metricsink.MetricSink) } if uri.String() == historicalUri { if asHistSource, ok := sink.(core.AsHistoricalSource); ok { historical = asHistSource.Historical() } else { glog.Errorf("Sink type %q does not support being used for historical access", uri.Key) } } result = append(result, sink) } // 默认metricSink必定会建立 if metric == nil { uri := flags.Uri{} uri.Set("metric") sink, err := this.Build(uri) if err == nil { result = append(result, sink) metric = sink.(*metricsink.MetricSink) } else { glog.Errorf("Error while creating metric sink: %v", err) } } if len(historicalUri) > 0 && historical == nil { glog.Errorf("Error while initializing historical access: unable to use sink %q as a historical source", historicalUri) } return metric, result, historical }
该接口流程比较简单,就是对传入参数进行判断,而后调用this.Build()进行建立,这里只须要注意即便没有配置metric,也会进行metricSink的建立。
func (this *SinkFactory) Build(uri flags.Uri) (core.DataSink, error) { switch uri.Key { 。。。 case "influxdb": return influxdb.CreateInfluxdbSink(&uri.Val) 。。。 case "metric": return metricsink.NewMetricSink(140*time.Second, 15*time.Minute, []string{ core.MetricCpuUsageRate.MetricDescriptor.Name, core.MetricMemoryUsage.MetricDescriptor.Name}), nil 。。。 default: return nil, fmt.Errorf("Sink not recognized: %s", uri.Key) } }
influxdb的建立其实就是根据传入的参数而后建立一个config结构,用于后面建立链接influxDB的client;
metric的建立其实就是初始化了一个MetricSink结构,须要注意的是传入的第三个参数,由于这是用于指定哪些metrics须要进行长时间存储,默认就是cpu/usage和memory/usage,由于这两个参数用户最为关心。
具体的建立接口就不在深刻了,较为简单。
到这里BuildAll()就结束了,至于返回值前面已经作过介绍,就不在累赘了。
其实没那么简单,还有一步:sinkManager的建立。
进入sinks.NewDataSinkManager()接口看下:
func NewDataSinkManager(sinks []core.DataSink, exportDataTimeout, stopTimeout time.Duration) (core.DataSink, error) { sinkHolders := []sinkHolder{} // 遍历前面建立的sinkList for _, sink := range sinks { // 为每一个sink添加一个dataChannel和stopChannel // 用于获取数据和stop信号 sh := sinkHolder{ sink: sink, dataBatchChannel: make(chan *core.DataBatch), stopChannel: make(chan bool), } sinkHolders = append(sinkHolders, sh) // 每一个sink都会建立一个协程 // 从dataChannel获取数据,并调用sink.export()导出到后端数据库 go func(sh sinkHolder) { for { select { case data := <-sh.dataBatchChannel: export(sh.sink, data) case isStop := <-sh.stopChannel: glog.V(2).Infof("Stop received: %s", sh.sink.Name()) if isStop { sh.sink.Stop() return } } } }(sh) } return &sinkManager{ sinkHolders: sinkHolders, exportDataTimeout: exportDataTimeout, stopTimeout: stopTimeout, }, nil }
这里会为每一个sink建立协程,等待数据的到来并最终将数据导入到对应的后端数据库。
这里须要带个问号,既然channel有一端在收,总得有地方会发送,这会在后面才会揭晓。
go协程 + channel的方式,是golang最多见的方式,确实便用。
由于cAdvisor返回的原始数据就包含了nodes和containers的相关数据,因此heapster须要建立各类processor,用于处理成不一样类型的数据,好比pod, namespace, cluster,node。
还有些数据须要计算出速率,有些数据须要进行累加,不一样类型拥有的metrics还不同等等状况。
看下源码:
func main() { ... // 计算namespace和cluster的metrics值时,下列数据须要进行累加求值 metricsToAggregate := []string{ core.MetricCpuUsageRate.Name, core.MetricMemoryUsage.Name, core.MetricCpuRequest.Name, core.MetricCpuLimit.Name, core.MetricMemoryRequest.Name, core.MetricMemoryLimit.Name, } // 计算node的metrics值时,下列数据须要进行累加求值 metricsToAggregateForNode := []string{ core.MetricCpuRequest.Name, core.MetricCpuLimit.Name, core.MetricMemoryRequest.Name, core.MetricMemoryLimit.Name, } // RateMetricsMapping中的数据须要计算速率,好比cpu/usage_rate,network/rx_rate dataProcessors := []core.DataProcessor{ // Convert cumulaties to rate processors.NewRateCalculator(core.RateMetricsMapping), } kubernetesUrl, err := getKubernetesAddress(argSources) if err != nil { glog.Fatalf("Failed to get kubernetes address: %v", err) } kubeConfig, err := kube_config.GetKubeClientConfig(kubernetesUrl) if err != nil { glog.Fatalf("Failed to get client config: %v", err) } kubeClient := kube_client.NewOrDie(kubeConfig) // 建立pod的ListWatch,用于从k8s server监听pod变动 podLister, err := getPodLister(kubeClient) if err != nil { glog.Fatalf("Failed to create podLister: %v", err) } // 建立node的ListWatch,用于从k8s server监听node变动 nodeLister, err := getNodeLister(kubeClient) if err != nil { glog.Fatalf("Failed to create nodeLister: %v", err) } // 该podBasedEnricher用于解析从sources获取到的pod和container的metrics数据, // 而后对pod和container进行数据完善,好比添加labels.但这里还不会处理metricsValue podBasedEnricher, err := processors.NewPodBasedEnricher(podLister) if err != nil { glog.Fatalf("Failed to create PodBasedEnricher: %v", err) } dataProcessors = append(dataProcessors, podBasedEnricher) // 跟上面的podBasedEnricher同理,须要注意的是在append时有前后顺序 namespaceBasedEnricher, err := processors.NewNamespaceBasedEnricher(kubernetesUrl) if err != nil { glog.Fatalf("Failed to create NamespaceBasedEnricher: %v", err) } dataProcessors = append(dataProcessors, namespaceBasedEnricher) // 这里的对象会对metricsValue进行处理,对应的数据进行累加求值 dataProcessors = append(dataProcessors, processors.NewPodAggregator(), &processors.NamespaceAggregator{ MetricsToAggregate: metricsToAggregate, }, &processors.NodeAggregator{ MetricsToAggregate: metricsToAggregateForNode, }, &processors.ClusterAggregator{ MetricsToAggregate: metricsToAggregate, }) dataProcessors = append(dataProcessors, processors.NewRcAggregator()) nodeAutoscalingEnricher, err := processors.NewNodeAutoscalingEnricher(kubernetesUrl) if err != nil { glog.Fatalf("Failed to create NodeAutoscalingEnricher: %v", err) } dataProcessors = append(dataProcessors, nodeAutoscalingEnricher)
Processors的功能基本就是这样了,相对有点复杂,数据处理的样式和类别较多。
各个对象的Process()方法就不进行一一介绍了,就是按照顺序一个一个的填充core.DataBatch数据。有兴趣的能够逐个看下,能够借鉴下实现的方式。
前面的都是铺垫,开始介绍heapster的关键实现,进行源数据的获取,并导出到后端存储。
先介绍相关结构:
type Manager interface { Start() Stop() }
Manager是须要实现Start和stop方法的接口。而真实建立的对象实际上是realManager:
type realManager struct { // 数据源 source core.MetricsSource // 数据处理对象 processors []core.DataProcessor // 后端存储对象 sink core.DataSink // 每次scrape数据的时间间隔 resolution time.Duration // 建立多个scrape协程时,须要sleep这点时间,防止异常 scrapeOffset time.Duration // scrape 中止的管道 stopChan chan struct{} // housekeepSemaphoreChan chan struct{} // 超时 housekeepTimeout time.Duration }
关键的代码以下:
manager, err := manager.NewManager(sourceManager, dataProcessors, sinkManager, *argMetricResolution, manager.DefaultScrapeOffset, manager.DefaultMaxParallelism) if err != nil { glog.Fatalf("Failed to create main manager: %v", err) } manager.Start()
首先会根据前面建立的sourceManager, dataProcessors, sinkManager对象,再建立manager。
路径: heapster/metrics/manager/manager.go
func NewManager(source core.MetricsSource, processors []core.DataProcessor, sink core.DataSink, resolution time.Duration, scrapeOffset time.Duration, maxParallelism int) (Manager, error) { manager := realManager{ source: source, processors: processors, sink: sink, resolution: resolution, scrapeOffset: scrapeOffset, stopChan: make(chan struct{}), housekeepSemaphoreChan: make(chan struct{}, maxParallelism), housekeepTimeout: resolution / 2, } for i := 0; i < maxParallelism; i++ { manager.housekeepSemaphoreChan <- struct{}{} } return &manager, nil }
前面介绍了该关键结构readlManager,继续进入manager.Start():
func (rm *realManager) Start() { go rm.Housekeep() } func (rm *realManager) Housekeep() { for { // Always try to get the newest metrics now := time.Now() // 获取数据的时间段,默认是1min start := now.Truncate(rm.resolution) end := start.Add(rm.resolution) // 真正同步一次的时间间隔,默认是1min + 5s timeToNextSync := end.Add(rm.scrapeOffset).Sub(now) select { case <-time.After(timeToNextSync): rm.housekeep(start, end) case <-rm.stopChan: rm.sink.Stop() return } } }
继续看rm.housekeep(start, end), 该接口就传入了时间区间,其实cAdvisor就是支持时间区间来获取metrics值。
func (rm *realManager) housekeep(start, end time.Time) { if !start.Before(end) { glog.Warningf("Wrong time provided to housekeep start:%s end: %s", start, end) return } select { case <-rm.housekeepSemaphoreChan: // ok, good to go case <-time.After(rm.housekeepTimeout): glog.Warningf("Spent too long waiting for housekeeping to start") return } go func(rm *realManager) { defer func() { rm.housekeepSemaphoreChan <- struct{}{} }() // 从sources获取数据 data := rm.source.ScrapeMetrics(start, end) // 遍历processors,而后进行数据处理 for _, p := range rm.processors { newData, err := process(p, data) if err == nil { data = newData } else { glog.Errorf("Error in processor: %v", err) return } } // 最终将数据导出到后端存储 rm.sink.ExportData(data) }(rm) }
逻辑比较简单,会有三个关键:
源数据获取
数据处理
导出到后端
先看下rm.source.ScrapeMetrics()接口实现.
路径: heapster/metrics/sources/manager.go
func (this *sourceManager) ScrapeMetrics(start, end time.Time) *DataBatch { // 调用了nodeLister.List()获取最新的k8s nodes列表,再根据以前配置的kubelet端口等信息,返回sources // 在建立sourceProvider时,会建立node的ListWatch,因此这里nodeLister可以使用list() sources := this.metricsSourceProvider.GetMetricsSources() responseChannel := make(chan *DataBatch) 。。。 // 遍历各个source,而后建立协程获取数据 for _, source := range sources { go func(source MetricsSource, channel chan *DataBatch, start, end, timeoutTime time.Time, delayInMs int) { // scrape()接口其实就是调用了kubeletMetricsSource.ScrapeMetrics() // 每一个node都会组成对应的kubeletMetricsSource // ScrapeMetrics()就是从cAdvisor中获取监控信息,并进行了decode metrics := scrape(source, start, end) ... select { // 将获取到的数据丢入responseChannel // 下面会用到 case channel <- metrics: // passed the response correctly. return case <-time.After(timeForResponse): glog.Warningf("Failed to send the response back %s", source) return } }(source, responseChannel, start, end, timeoutTime, delayMs) } response := DataBatch{ Timestamp: end, MetricSets: map[string]*MetricSet{}, } latencies := make([]int, 11) responseloop: for i := range sources { ... select { // 获取前面建立的协程获得的数据 case dataBatch := <-responseChannel: if dataBatch != nil { for key, value := range dataBatch.MetricSets { response.MetricSets[key] = value } } 。。。 case <-time.After(timeoutTime.Sub(now)): glog.Warningf("Failed to get all responses in time (got %d/%d)", i, len(sources)) break responseloop } } ... return &response }
该接口的逻辑就是先经过nodeLister获取k8s全部的nodes,这样便能知道全部的kubelet信息,而后建立对应数量的协程从各个kubelet中获取对应的cAdvisor监控信息,进行处理后再返回。
获取到数据后,就须要调用各个processors的Process()接口进行数据处理,接口太多就不一一介绍了,挑个node_aggregator.go进行介绍:
func (this *NodeAggregator) Process(batch *core.DataBatch) (*core.DataBatch, error) { for key, metricSet := range batch.MetricSets { // 判断下该metric是不是pod的 // metricSet.Labels都是前面就进行了填充,因此前面说须要注意每一个processor的append顺序 if metricSetType, found := metricSet.Labels[core.LabelMetricSetType.Key]; found && metricSetType == core.MetricSetTypePod { // Aggregating pods nodeName, found := metricSet.Labels[core.LabelNodename.Key] if nodeName == "" { glog.V(8).Infof("Skipping pod %s: no node info", key) continue } if found { // 获取nodeKey,好比: node:172.25.5.111 nodeKey := core.NodeKey(nodeName) // 前面都是判断该pod在哪一个node上,而后该node的数据是须要经过这些pod进行累加获得 node, found := batch.MetricSets[nodeKey] if !found { glog.V(1).Info("No metric for node %s, cannot perform node level aggregation.") } else if err := aggregate(metricSet, node, this.MetricsToAggregate); err != nil { return nil, err } } else { glog.Errorf("No node info in pod %s: %v", key, metricSet.Labels) } } } return batch, nil }
基本流程就是这样了,有须要的能够各个深刻查看。
最后就是数据的后端存储。
这里会涉及到两部分:metricSink和influxdbSink。
从rm.sink.ExportData(data)接口入手:
路径: heapster/metrics/sinks/manager.go
func (this *sinkManager) ExportData(data *core.DataBatch) { var wg sync.WaitGroup // 遍历全部的sink,这里其实就两个 for _, sh := range this.sinkHolders { wg.Add(1) // 建立协程,而后将以前获取的data丢入dataBatchChannel go func(sh sinkHolder, wg *sync.WaitGroup) { defer wg.Done() glog.V(2).Infof("Pushing data to: %s", sh.sink.Name()) select { case sh.dataBatchChannel <- data: glog.V(2).Infof("Data push completed: %s", sh.sink.Name()) // everything ok case <-time.After(this.exportDataTimeout): glog.Warningf("Failed to push data to sink: %s", sh.sink.Name()) } }(sh, &wg) } // Wait for all pushes to complete or timeout. wg.Wait() }
千辛万苦,你把数据丢入sh.dataBatchChannel完事了?
dataBatchChannel有点眼熟,由于以前建立sinkManager的时候,也建立了协程并监听了该管道,因此真正export数据是在以前就完成了,这里只须要把数据丢入管道便可。
因此golang中协程与协程之间的通讯,channel才是王道啊!
ExportData有两个,一个一个讲吧。
先来关键的influxDB.
路径: heapster/metrics/sinks/influxdb/influxdb.go
func (sink *influxdbSink) ExportData(dataBatch *core.DataBatch) { ... dataPoints := make([]influxdb.Point, 0, 0) for _, metricSet := range dataBatch.MetricSets { // 遍历MetricValues for metricName, metricValue := range metricSet.MetricValues { var value interface{} if core.ValueInt64 == metricValue.ValueType { value = metricValue.IntValue } else if core.ValueFloat == metricValue.ValueType { value = float64(metricValue.FloatValue) } else { continue } // Prepare measurement without fields fieldName := "value" measurementName := metricName if sink.c.WithFields { // Prepare measurement and field names serieName := strings.SplitN(metricName, "/", 2) measurementName = serieName[0] if len(serieName) > 1 { fieldName = serieName[1] } } // influxdb单条数据结构 point := influxdb.Point{ // 度量值名称,好比cpu/usage Measurement: measurementName, // 该tags就是在processors中进行添加,主要是pod_name,node_name,namespace_name等 Tags: metricSet.Labels, // 该字段就是具体的值了 Fields: map[string]interface{}{ fieldName: value, }, // 时间戳 Time: dataBatch.Timestamp.UTC(), } // append到dataPoints,超过maxSendBatchSize数量后直接sendData到influxdb dataPoints = append(dataPoints, point) if len(dataPoints) >= maxSendBatchSize { sink.sendData(dataPoints) dataPoints = make([]influxdb.Point, 0, 0) } } // 遍历LabeledMetrics,主要就是filesystem的数据 // 不太明白为什么要将filesystem的数据进行区分,要放到Labeled中?什么意图?望高手指点,谢谢 // 接下来的操做就跟上面MetricValues的操做差很少了 for _, labeledMetric := range metricSet.LabeledMetrics { 。。。 point := influxdb.Point{ Measurement: measurementName, Tags: make(map[string]string), Fields: map[string]interface{}{ fieldName: value, }, Time: dataBatch.Timestamp.UTC(), } for key, value := range metricSet.Labels { point.Tags[key] = value } for key, value := range labeledMetric.Labels { point.Tags[key] = value } dataPoints = append(dataPoints, point) if len(dataPoints) >= maxSendBatchSize { sink.sendData(dataPoints) dataPoints = make([]influxdb.Point, 0, 0) } } } if len(dataPoints) >= 0 { sink.sendData(dataPoints) } }
该接口中有一处不太明白,metricSet中的LabeledMetrics和MetricsValue有何差异,为什么要将filesystem的数据进行区分对待,放入LabeldMetrics?
看代码的过程当中没有获得答案,望大神指点迷津,多谢多谢!
有问题,但也不影响继续往下学习,接着看下MetricSink:
func (this *MetricSink) ExportData(batch *core.DataBatch) { this.lock.Lock() defer this.lock.Unlock() now := time.Now() // 将数据丢入longStore和shortStore // 须要根据保存的时间将老数据丢弃 this.longStore = append(popOldStore(this.longStore, now.Add(-this.longStoreDuration)), buildMultimetricStore(this.longStoreMetrics, batch)) this.shortStore = append(popOld(this.shortStore, now.Add(-this.shortStoreDuration)), batch) }
该逻辑比较简单,就是将数据丢入两个Store中,而后把过时数据丢弃。
这里提醒一点,heapster API调用时先会从longStore中匹配数据,没匹配上的话再从shortStore获取,而longStore中存储的数据类型前面已经作过介绍。
终于结束了。。
前面的主流业务都介绍完了,Heapster自己也提供了API用于开发者进行使用与测试。
继续分析代码吧:
// 关键接口,后面分析 handler := setupHandlers(metricSink, podLister, nodeLister, historicalSource) 。。。 // 建立http的mux多分器,用于http.Server的路由 mux := http.NewServeMux() // prometheus:最新出现的人气很高的监控系统,值得了解学习下,后续安排! promHandler := prometheus.Handler() // 支持TLS,咱们用了http if len(*argTLSCertFile) > 0 && len(*argTLSKeyFile) > 0 { 。。。 } else { // 多分器分了"/"和"/metrics" // 进入"/",还会进行细分,里面使用到了go-restful mux.Handle("/", handler) mux.Handle("/metrics", promHandler) // 注册健康检测接口 healthz.InstallHandler(mux, healthzChecker(metricSink)) // 启动Server glog.Fatal(http.ListenAndServe(addr, mux)) }
这里的关键是setupHandlers()接口,须要学习下里面如何使用go-restful进行请求路由的。
k8s apiServer中也大量使用了go-restful,在学习该源码时有进行过度析
路径: heapster/metrics/handlers.go
func setupHandlers(metricSink *metricsink.MetricSink, podLister *cache.StoreToPodLister, nodeLister *cache.StoreToNodeLister, historicalSource core.HistoricalSource) http.Handler { runningInKubernetes := true // 建立container,指定route类型为CurlyRouter // 这些都跟go-restful基础有关,有兴趣的能够看下原理 wsContainer := restful.NewContainer() wsContainer.EnableContentEncoding(true) wsContainer.Router(restful.CurlyRouter{}) // 注册v1版本相关的api,包括官方介绍的"/api/v1/model" a := v1.NewApi(runningInKubernetes, metricSink, historicalSource) a.Register(wsContainer) // 这个metricsApi注册了"/apis/metrics/v1alpha1"的各种命令 // 暂不关心 m := metricsApi.NewApi(metricSink, podLister, nodeLister) m.Register(wsContainer) handlePprofEndpoint := func(req *restful.Request, resp *restful.Response) { name := strings.TrimPrefix(req.Request.URL.Path, pprofBasePath) switch name { case "profile": pprof.Profile(resp, req.Request) case "symbol": pprof.Symbol(resp, req.Request) case "cmdline": pprof.Cmdline(resp, req.Request) default: pprof.Index(resp, req.Request) } } // Setup pporf handlers. ws = new(restful.WebService).Path(pprofBasePath) ws.Route(ws.GET("/{subpath:*}").To(metrics.InstrumentRouteFunc("pprof", handlePprofEndpoint))).Doc("pprof endpoint") wsContainer.Add(ws) return wsContainer }
关键在于v1版本的API注册,继续深刻a.Register(wsContainer):
func (a *Api) Register(container *restful.Container) { // 注册"/api/v1/metric-export" API // 用于从shortStore中获取全部的metrics信息 ws := new(restful.WebService) ws.Path("/api/v1/metric-export"). Doc("Exports the latest point for all Heapster metrics"). Produces(restful.MIME_JSON) ws.Route(ws.GET(""). To(a.exportMetrics). Doc("export the latest data point for all metrics"). Operation("exportMetrics"). Writes([]*types.Timeseries{})) // ws必需要add到container中才能生效 container.Add(ws) // 注册"/api/v1/metric-export-schema" API // 用于导出全部的metrics name,好比network-rx // 还会导出还有的labels,好比pod-name ws = new(restful.WebService) ws.Path("/api/v1/metric-export-schema"). Doc("Schema for metrics exported by heapster"). Produces(restful.MIME_JSON) ws.Route(ws.GET(""). To(a.exportMetricsSchema). Doc("export the schema for all metrics"). Operation("exportmetricsSchema"). Writes(types.TimeseriesSchema{})) container.Add(ws) // 注册metircSink相关的API,即"/api/v1/model/" if a.metricSink != nil { glog.Infof("Starting to Register Model.") a.RegisterModel(container) } if a.historicalSource != nil { a.RegisterHistorical(container) } }
官方资料中介绍heapster metric model,咱们使用到这些API也会比较多。
进入a.RegisterModel(container)看下:
func (a *Api) RegisterModel(container *restful.Container) { ws := new(restful.WebService) // 指定全部命令的prefix: "/api/v1/model" ws.Path("/api/v1/model"). Doc("Root endpoint of the stats model"). Consumes("*/*"). Produces(restful.MIME_JSON) // 在这里增长各种命令,好比"/metrics/,/nodes/"等等 addClusterMetricsRoutes(a, ws) // 列出全部的keys ws.Route(ws.GET("/debug/allkeys"). To(metrics.InstrumentRouteFunc("debugAllKeys", a.allKeys)). Doc("Get keys of all metric sets available"). Operation("debugAllKeys")) container.Add(ws) }
继续看addClusterMetricsRoutes():
func addClusterMetricsRoutes(a clusterMetricsFetcher, ws *restful.WebService) { 。。。 if a.isRunningInKubernetes() { // 列出全部namespaces的API ws.Route(ws.GET("/namespaces/"). To(metrics.InstrumentRouteFunc("namespaceList", a.namespaceList)). Doc("Get a list of all namespaces that have some current metrics"). Operation("namespaceList")) // 获取指定namespaces的metrics ws.Route(ws.GET("/namespaces/{namespace-name}/metrics"). To(metrics.InstrumentRouteFunc("availableNamespaceMetrics", a.availableNamespaceMetrics)). Doc("Get a list of all available metrics for a Namespace entity"). Operation("availableNamespaceMetrics"). Param(ws.PathParameter("namespace-name", "The name of the namespace to lookup").DataType("string"))) // 获取namespace指定的metrics值 ws.Route(ws.GET("/namespaces/{namespace-name}/metrics/{metric-name:*}"). To(metrics.InstrumentRouteFunc("namespaceMetrics", a.namespaceMetrics)). Doc("Export an aggregated namespace-level metric"). Operation("namespaceMetrics"). Param(ws.PathParameter("namespace-name", "The name of the namespace to lookup").DataType("string")). Param(ws.PathParameter("metric-name", "The name of the requested metric").DataType("string")). Param(ws.QueryParameter("start", "Start time for requested metrics").DataType("string")). Param(ws.QueryParameter("end", "End time for requested metric").DataType("string")). Param(ws.QueryParameter("labels", "A comma-separated list of key:values pairs to use to search for a labeled metric").DataType("string")). Writes(types.MetricResult{})) 。。。 } 。。。 }
Heapster API的注册基本就这样了,在花点时间看下API的实现吧。
咱们挑一个例子作下分析,获取某个pod的指定的metrics值.
对应的接口:heapster/metrics/api/v1/model_handler.go
func (a *Api) podMetrics(request *restful.Request, response *restful.Response) { a.processMetricRequest( // 根据URI传入的ns和pod名字,拼装成key,如:"namespace:default/pod:123" core.PodKey(request.PathParameter("namespace-name"), request.PathParameter("pod-name")), request, response) }
根据URI的输入参数并调用processMetricRequest()接口,获取对应的metric value:
func (a *Api) processMetricRequest(key string, request *restful.Request, response *restful.Response) { // 时间区间 start, end, err := getStartEndTime(request) if err != nil { response.WriteError(http.StatusBadRequest, err) return } // 获取metric Name,好比"/cpu/usage" metricName := request.PathParameter("metric-name") // 根据metricName进行转换,好比将cpu-usage转换成cpu/usage_rate // 因此这里须要注意cpu-usage不等于/cpu/usage,一个表示cpu使用率,一个表示cpu使用量 convertedMetricName := convertMetricName(metricName) // 获取请求中的labels,根据是否有指定labels来调用不一样的接口 labels, err := getLabels(request) if err != nil { response.WriteError(http.StatusBadRequest, err) return } var metrics map[string][]core.TimestampedMetricValue if labels != nil { // 该接口从metricSet.LabeledMetrics中获取对应的value metrics = a.metricSink.GetLabeledMetric(convertedMetricName, labels, []string{key}, start, end) } else { // 该接口先从longStoreMetrics中进行匹配,匹配不到的话再从shortStore中获取对应的metricValue metrics = a.metricSink.GetMetric(convertedMetricName, []string{key}, start, end) } // 将获取到的metricValue转换成MetricPoint格式的值,会有多组"时间戳+value" converted := exportTimestampedMetricValue(metrics[key]) // 将结果进行response response.WriteEntity(converted) }
OK,大功告成!API的实现也讲完了,不少API都是相通的,最终都会调用相同的接口,因此不一一介绍了。
这里须要注意heapster的API的URI还有多种写法,好比/api/v1/model/cpu-usage,等价于/api/v1/model/cpu/usage_rate/,别误理解成/cpu/usage了,这两个概念不同,一个是cpu使用率,一个是cpu使用量。
上面的提醒告诉咱们,没事多看源码,不少误解天然而然就解除了!
笔者能力有限,看源码也在于学习提高能力,固然也会有较多不理解或者理解不当的地方,但愿各位能予以矫正,多谢多谢!
上面的介绍完了Heapster的实现,咱们能够思考下是否能够动手修改源码,好比增长一些对象的metrics信息。
笔者考虑是否能够直接支持RC/RS/Deployment的metrics信息,让业务层能够直接拿到服务的总体信息。
Heapster官方资料:https://github.com/kubernetes...
InfluxDB github: https://github.com/influxdata...