open_falcon 是由小米开源的企业级服务器运维监控系统。html
介绍主要节点:mysql
agent:安装在须要监控的服务器上,负责采集该服务器上的数据指标(cpu空闲率,io负载率,mem使用率等),每格一段时间采集一条数据,天天数据只包含一个被采集的目标,好比mesg1只上报cpu空闲率,msg2只上报io负载率。因此每一个msg只包含一个key-value对和消息标签。主动推送模式。web
transfer:负责转发消息,不少个agent上报消息,这些消息聚集到多个transfer上,由transfer对消息进行转发,转发到graph和judge节点。sql
judge:负责根据消息的内容进行报警,好比cpu空闲率低于30%等状况,并且能够配置报警级别多种报警模式等。数据库
graph:负责历史数据的存储,存储形式为rrd文件模式。本地存有缓存。graph还负责数据的查询工做,当须要查询某一段的数据时,会从rrd文件和cache中合并结果返回给请求。apache
graph 既然使用了rrd文件进行存储,首先就要了解下rrd的特性吧,了解rrd文件的特殊性,请参考链接:http://www.360doc.com/content/07/0302/20/15540_382048.shtml。感谢该做者详细的描述。json
rrd文件总结:rrd使用一个环形的存储空间基于时间序列的数据库,建立时会指定最大行数,行数事后开始循环覆盖。每一个rrd文件能够设定多个环,每一个环的大小单独设定,每一个环统计的数据间隔不同,好比step为1min,环1能够是1*step存储一次数据,环2能够是5*step存储一次数据。rrd的建立、更新能够由rrdtool来完成,不须要关心内部实现,来一个数据须要保存到哪一个环中,rrdtool本身会判断。读取数据时rrdtool会根据读取的长度判断从哪一个环获取数据。api
每个终端(endpoint)中每一个属性(metric)都会造成一个rrd_file。这样会统计出超多的rrd文件。若是查询的话,能够去rrd文件中去获取数据。缓存
rrd的文件名字与endpoint、metric、tags排序、dstype,step相关。可是用户查询数据的时候不会给出dstype和step的参数。这时rrd文件的名字就不能肯定。怎么来肯定rrd文件的名字,借助于mysql保存关键信息。看mysql信息前,先看看用户采集的数据是什么样的。安全
type GraphItem struct { Endpoint string `json:"endpoint"` Metric string `json:"metric"` Tags map[string]string `json:"tags"` Value float64 `json:"value"` Timestamp int64 `json:"timestamp"` DsType string `json:"dstype"` Step int `json:"step"` Heartbeat int `json:"heartbeat"` Min string `json:"min"` Max string `json:"max"` }
数据采集脚本[{\"metric\": \"metric.demo\", \"endpoint\": \"qd-open-falcon-judge01.hd\", \"timestamp\": $ts,\"step\": 60,\"value\": 9,\"counterType\": \"GAUGE\",\"tags\": \"project=falcon,module=judge\"}]
这里边成员就不解释了,都能看的明白。
为了减小读写rrd文件的次数,会在本地缓存这个接收到的数据,并且为了快速查找文件在本地创建索引index,方便查找。
在统一的api接口api/graph.go文件中定义:
a、接受数据HandleItems()
b、查询数据Query()
接下来 先分析HandleItems是怎么来存储数据和创建index的,而后再说明数据的查询操做。
for i := 0; i < count; i++ { //循环全部接收到的item,分别处理 if items[i] == nil { continue } dsType := items[i].DsType //获取item的dsType,dstype是什么意思去看rrd文档。 step := items[i].Step //获取item的step,就是数据采集的步长。 checksum := items[i].Checksum() //根据item计算checksum =md5[endpoint/metric/tag] //生成rrd缓存的key return fmt.Sprintf("%s_%s_%d", md5, dsType, step) ckey := g.FormRrdCacheKey(checksum, dsType, step) //statistics proc.GraphRpcRecvCnt.Incr() // To Graph first := store.GraphItems.First(ckey) if first != nil && items[i].Timestamp <= first.Timestamp { continue } //添加到item的本地缓存中 store.GraphItems.PushFront(ckey, items[i]) // To Index 创建本地索引 index.ReceiveItem(items[i], checksum) // To History 暂时不明作什么用的 store.AddItem(checksum, items[i]) }
从上段代码的注释中能够知晓数据的存储过程。
a.GraphItems.PushFront(ckey,item[i]).放入到本地缓存,按期刷写到磁盘中。
b.index.ReceiveItem(item[i],checksum)放入本地索引中,建立索引。
在上面的HandleItems中能够看出,接受到的item都添加到GraphItem的缓存中。
在/graph/rrdtool/syncdisk.go中有syncDisk()的操做,syncDisk中会定时的对缓存GraphItem进行刷写操做。
当刷写到磁盘时,删除本地缓存中的item。syncDisk->flushRrd(),flushRrd函数在rrdtool.go中。这部分代码简单,剩下实现的部分由rrdtool提供,涉及到rrd文件更新,这里很少说。
// index收到一条新上报的监控数据,尝试用于增量更新索引 func ReceiveItem(item *cmodel.GraphItem, md5 string) { if item == nil { return } uuid := item.UUID() // 已上报过的数据 if indexedItemCache.ContainsKey(md5) { old := indexedItemCache.Get(md5).(*IndexCacheItem) if uuid == old.UUID { // dsType+step没有发生变化,只更新缓存 TODO 存在线程安全的问题 old.Item = item } else { // dsType+step变化了,当成一个新的增量来处理(甚至,不用rrd文件来过滤) //indexedItemCache.Remove(md5) unIndexedItemCache.Put(md5, NewIndexCacheItem(uuid, item)) } return } //省略一些代码..... // 缓存未命中, 放入增量更新队列 unIndexedItemCache.Put(md5, NewIndexCacheItem(uuid, item)) }
当索引接受到数据后,经过数据的checksum值来肯定是否是这个来自endpoint的metric是不是第一次采集数据。
若是不是第一次采集数据,则在indexedItemCache中可以找到,而且若是uuid没变则只更新item。若是uuid变了从新index操做(涉及dstype和step的改变)。
若是是第一次数据采集,在indexeditemCache中找不到,添加到unindexeditemCache中,等待被索引。
创建增量索引操做index_update_incr_task.go/StartIndexUpdateIncrTask 操做,他会定时的启动updateIndexIncr()操做。
keys := unIndexedItemCache.Keys() for _, key := range keys { icitem := unIndexedItemCache.Get(key) unIndexedItemCache.Remove(key) if icitem != nil { // 并发更新mysql semaUpdateIndexIncr.Acquire() go func(key string, icitem *IndexCacheItem, dbConn *sql.DB) { defer semaUpdateIndexIncr.Release() err := maybeUpdateIndexFromOneItem(icitem.Item, dbConn) if err != nil { proc.IndexUpdateIncrErrorCnt.Incr() } else { indexedItemCache.Put(key, icitem) } }(key, icitem.(*IndexCacheItem), dbConn) ret++ } }
这里的key由(t.Endpoint, t.Metric, t.Tags)计算得出。unIndexedItemCache中根据key来存储item。每一个key保存一个最新的item。
由这个最新的item去maybeUpdateIndexFromOneItem,由这个函数去更新DB中的表。
这里有三个表须要更新:
a、endpoint 表。该表记录了全部上报数据的endpoint,而且为每个endpoint生成一个id即 endpoint_id。
b、tag_endpoint表。拆解item的每个tag。用tag和endpoint造成一个主键的表。记录每一个endpoint包含的tag。每条记录生成一个id,为tagendpoint_id
c、endpoint_counter表。counter是metric和tags组合后的名词。看做是一个总体。
表结构以下:
mysql> show columns from endpoint -> ; +----------+------------------+------+-----+-------------------+-----------------------------+ | Field | Type | Null | Key | Default | Extra | +----------+------------------+------+-----+-------------------+-----------------------------+ | id | int(10) unsigned | NO | PRI | NULL | auto_increment | | endpoint | varchar(255) | NO | UNI | | | | ts | int(11) | YES | | NULL | | | t_create | datetime | NO | | NULL | | | t_modify | timestamp | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP | +----------+------------------+------+-----+-------------------+-----------------------------+ mysql> show columns from tag_endpoint; +-------------+------------------+------+-----+-------------------+-----------------------------+ | Field | Type | Null | Key | Default | Extra | +-------------+------------------+------+-----+-------------------+-----------------------------+ | id | int(10) unsigned | NO | PRI | NULL | auto_increment | | tag | varchar(255) | NO | MUL | | | | endpoint_id | int(10) unsigned | NO | | NULL | | | ts | int(11) | YES | | NULL | | | t_create | datetime | NO | | NULL | | | t_modify | timestamp | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP | +-------------+------------------+------+-----+-------------------+-----------------------------+ 6 rows in set (0.00 sec) mysql> show columns from endpoint_counter; +-------------+------------------+------+-----+-------------------+-----------------------------+ | Field | Type | Null | Key | Default | Extra | +-------------+------------------+------+-----+-------------------+-----------------------------+ | id | int(10) unsigned | NO | PRI | NULL | auto_increment | | endpoint_id | int(10) unsigned | NO | MUL | NULL | | | counter | varchar(255) | NO | | | | | step | int(11) | NO | | 60 | | | type | varchar(16) | NO | | NULL | | | ts | int(11) | YES | | NULL | | | t_create | datetime | NO | | NULL | | | t_modify | timestamp | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP | +-------------+------------------+------+-----+-------------------+-----------------------------+ 8 rows in set (0.00 sec)
为何要创建index'索引呢?
a、.为了查询的速度吗? 其实查询的时候查的本地缓存item中和rrd文件中的数据,这些与index中保存的数据关系不大,并且index中保存的采集相同项只有一份。因此认为这里是数据缓存意义不大。
b、其实这个index最后都转化成了这三个表。这三个表的意义呢?在于何处? 答案是在与rrd文件的索引。表中并无直接保存rrd文件的名字。若是查询的时候该怎么知道去查询哪个rrd文件呢?不可能全部的rrd文件的头部都扫描一遍。。。。。这时就体现了这个数据库表的意义了。
首先回顾一下rrd文件的命名:filename :=
func RrdFileName(baseDir string, md5 string, dsType string, step int) string { return fmt.Sprintf("%s/%s/%s_%s_%d.rrd", baseDir, md5[0:2], md5, dsType, step) }
由md五、dstype、step决定。而md5前面也说了 与 Endpoint,Metric,Tags相关。因此 filename=f(Endpoint,Metric,Tags,dstype,step)相关,因此要准确的找到rrd文件,必须凑齐这5个元素。
可是问题是查询的时候根本不给这5个条件,而是给了?
type GraphQueryParam struct { Start int64 `json:"start"` End int64 `json:"end"` ConsolFun string `json:"consolFuc"` Endpoint string `json:"endpoint"` Counter string `json:"counter"` }
有效的是 时间段start和end、endpoint、counter。counter是(Metric,Tags)的组合。为了凑齐5个条件组成rrdfilename缺乏的是dstype和step。那么这时候能够根据endpoint和counter在endpoint_counter表中找到dstype和step。而后组合成rrdfilename。进行读取数据。
//--------->查询的输入参数GraphQueryParam,上面也介绍了 func (this *Graph) Query(param cmodel.GraphQueryParam, resp *cmodel.GraphQueryResponse) error { // statistics proc.GraphQueryCnt.Incr() // form empty response resp.Values = []*cmodel.RRDData{} //------》用于存放获取的数据 resp.Endpoint = param.Endpoint // -------》数据的endpoint resp.Counter = param.Counter // ----------》数据的counter信息 dsType, step, exists := index.GetTypeAndStep(param.Endpoint, param.Counter) // complete dsType and step //----------->从缓存或者DB中获取dstype和step。这里DB一样使用了一层本身的缓存,能够本身去看下 if !exists { return nil } resp.DsType = dsType resp.Step = step start_ts := param.Start - param.Start%int64(step) //------》根据step对齐整理start时间 end_ts := param.End - param.End%int64(step) + int64(step) //-----》根据step对齐整理end时间 if end_ts-start_ts-int64(step) < 1 { return nil } md5 := cutils.Md5(param.Endpoint + "/" + param.Counter) //------->计算md5值,用于计算key值 ckey := g.FormRrdCacheKey(md5, dsType, step) //-----》计算key值,用于缓存索引,这个缓存是数据缓存,不是index缓存 filename := g.RrdFileName(g.Config().RRD.Storage, md5, dsType, step) //还原rrd文件名字 // read data from rrd file datas, _ := rrdtool.Fetch(filename, param.ConsolFun, start_ts, end_ts, step) //从rrd中获取数据,从rrd中获取数据须要指定获取数据的时间段。 datas_size := len(datas) // read cached items items := store.GraphItems.FetchAll(ckey) //根据key值,在数据缓存中获取数据。 items_size := len(items)
最后根据 从rrd中获取的数据和从数据缓存中获取的数据进行合并,输出。完成查询。
数据一直保存在rrd文件中,能够进行查询,可是不适合大量数据的分析预测等过程,因此决定将采集的机器数据进行转储到hdfs上。在hdfs上可用于spark、mapreduce的处理分析工做。
转储原理:利用hdfs的http api接口进行转储。hdfs的http api接口查询,请参考:http://hadoop.apache.org/docs/r1.0.4/webhdfs.html#HTTP+Response+Codes
主要操做在graph增长一个队列listitem用于缓存item数据,而后启动一个转储线程,定时批量的转储item到hdfs中。
为何要定时批量的呢? 由于hdfs的接口不能频繁的提交数据。若是频繁的append数据。hdfs会报错,是hdfs的问题,尚未解决。因此采用规避问题的方法。