最近学习falcon,看了源码和极客学院的视频解析,画了调用结构、关系,对主要的代码进行了注释git
代码地址:https://github.com/beyondskyw...github
标签(空格分隔): falcon goshell
机器性能指标:cpu,mem,网卡,磁盘……json
业务监控网络
开源软件状态:Nginx,Redis,MySQL数据结构
snmp采集网络设备指标app
自发现采集值tcp
不一样类型数据采集分不一样goroutine函数
进程和端口经过用户配置进行监控性能
hostname和ip默认留空,agent自动探测
hbs和transfer都是配置其rpc地址
collector网卡采集前缀
ignore为true时取消上报
cron:间隔执行的代码,即定时任务
funcs:信息采集
g:全局数据结构
http:简单的dashboard的server,获取单机监控指标数据
plugins:插件处理机制
public:静态资源文件
了解agent、plugin版本信息,方便升级
获取监听的进程和端口
获取本机执行的插件列表
main入口
go cron.InitDataHistory() // 上报本机状态 cron.ReportAgentStatus() // 同步插件 cron.SyncMinePlugins() // 同步监控端口、路径、进程和URL cron.SyncBuiltinMetrics() // 后门调试agent,容许执行shell指令的ip列表 cron.SyncTrustableIps() // 开始数据次采集 cron.Collect() // 启动dashboard server go http.Start()
ReportAgentStatus:汇报agent自己状态
// 判断hbs配置是否正常,正常则上报agent状态 if g.Config().Heartbeat.Enabled && g.Config().Heartbeat.Addr != "" { // 根据配置的interval间隔上报信息 go reportAgentStatus(time.Duration(g.Config().Heartbeat.Interval) * time.Second) } func reportAgentStatus(interval time.Duration) { for { // 获取hostname, 出错则错误赋值给hostname hostname, err := g.Hostname() if err != nil { hostname = fmt.Sprintf("error:%s", err.Error()) } // 请求发送信息 req := model.AgentReportRequest{ Hostname: hostname, IP: g.IP(), AgentVersion: g.VERSION, // 经过shell指令获取plugin版本,可否go实现 PluginVersion: g.GetCurrPluginVersion(), } var resp model.SimpleRpcResponse // 调用rpc接口 err = g.HbsClient.Call("Agent.ReportStatus", req, &resp) if err != nil || resp.Code != 0 { log.Println("call Agent.ReportStatus fail:", err, "Request:", req, "Response:", resp) } time.Sleep(interval) } }
SyncMinePlugins:同步插件
func syncMinePlugins() { var ( timestamp int64 = -1 pluginDirs []string ) duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second for { time.Sleep(duration) hostname, err := g.Hostname() if err != nil { continue } req := model.AgentHeartbeatRequest{ Hostname: hostname, } var resp model.AgentPluginsResponse // 调用rpc接口,返回plugin err = g.HbsClient.Call("Agent.MinePlugins", req, &resp) if err != nil { log.Println("ERROR:", err) continue } // 保证时间顺序正确 if resp.Timestamp <= timestamp { continue } pluginDirs = resp.Plugins // 存放时间保证最新 timestamp = resp.Timestamp if g.Config().Debug { log.Println(&resp) } // 无插件则清空plugin if len(pluginDirs) == 0 { plugins.ClearAllPlugins() } desiredAll := make(map[string]*plugins.Plugin) // 读取全部plugin for _, p := range pluginDirs { underOneDir := plugins.ListPlugins(strings.Trim(p, "/")) for k, v := range underOneDir { desiredAll[k] = v } } // 中止不须要的插件,启动增长的插件 plugins.DelNoUsePlugins(desiredAll) plugins.AddNewPlugins(desiredAll) } }
SyncBuiltinMetrics:同步内置metric,包括端口、目录和进程信息
func syncBuiltinMetrics() { var timestamp int64 = -1 var checksum string = "nil" duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second for { time.Sleep(duration) // 监控端口、目录大小、进程 var ports = []int64{} var paths = []string{} var procs = make(map[string]map[int]string) var urls = make(map[string]string) hostname, err := g.Hostname() if err != nil { continue } req := model.AgentHeartbeatRequest{ Hostname: hostname, Checksum: checksum, } var resp model.BuiltinMetricResponse err = g.HbsClient.Call("Agent.BuiltinMetrics", req, &resp) if err != nil { log.Println("ERROR:", err) continue } if resp.Timestamp <= timestamp { continue } if resp.Checksum == checksum { continue } timestamp = resp.Timestamp checksum = resp.Checksum for _, metric := range resp.Metrics { if metric.Metric == g.URL_CHECK_HEALTH { arr := strings.Split(metric.Tags, ",") if len(arr) != 2 { continue } url := strings.Split(arr[0], "=") if len(url) != 2 { continue } stime := strings.Split(arr[1], "=") if len(stime) != 2 { continue } if _, err := strconv.ParseInt(stime[1], 10, 64); err == nil { urls[url[1]] = stime[1] } else { log.Println("metric ParseInt timeout failed:", err) } } // {metric: net.port.listen, tags: port=22} if metric.Metric == g.NET_PORT_LISTEN { arr := strings.Split(metric.Tags, "=") if len(arr) != 2 { continue } if port, err := strconv.ParseInt(arr[1], 10, 64); err == nil { ports = append(ports, port) } else { log.Println("metrics ParseInt failed:", err) } continue } // metric: du.bs tags: path=/home/works/logs // du -bs /home/works/logs if metric.Metric == g.DU_BS { arr := strings.Split(metric.Tags, "=") if len(arr) != 2 { continue } paths = append(paths, strings.TrimSpace(arr[1])) continue } //mereic: proc.num tags: name=crond //或者metric: proc.num tags: cmdline=cfg.json if metric.Metric == g.PROC_NUM { arr := strings.Split(metric.Tags, ",") tmpMap := make(map[int]string) for i := 0; i < len(arr); i++ { if strings.HasPrefix(arr[i], "name=") { tmpMap[1] = strings.TrimSpace(arr[i][5:]) } else if strings.HasPrefix(arr[i], "cmdline=") { tmpMap[2] = strings.TrimSpace(arr[i][8:]) } } procs[metric.Tags] = tmpMap } } g.SetReportUrls(urls) g.SetReportPorts(ports) g.SetReportProcs(procs) g.SetDuPaths(paths) } }
SyncTrustableIps:同步可信IP列表
请求获取远程访问执行shell命令的IP白名单,在经过http/run.go调用shell命令是会判断请求IP是否可信
func syncTrustableIps() { duration := time.Duration(g.Config().Heartbeat.Interval) * time.Second for { time.Sleep(duration) var ips string err := g.HbsClient.Call("Agent.TrustableIps", model.NullRpcRequest{}, &ips) if err != nil { log.Println("ERROR: call Agent.TrustableIps fail", err) continue } // 设置到本地可信IP列表 g.SetTrustableIps(ips) } }
FuncsAndInterval:拆分不一样的采集函数集,方便经过不一样goroutine运行
// 间隔internal时间执行fs中的函数 type FuncsAndInterval struct { Fs []func() []*model.MetricValue Interval int } var Mappers []FuncsAndInterval // 根据调用指令类型和是否容易被挂起而分类(经过不一样的goroutine去执行,避免相互之间的影响) func BuildMappers() { interval := g.Config().Transfer.Interval Mappers = []FuncsAndInterval{ FuncsAndInterval{ Fs: []func() []*model.MetricValue{ AgentMetrics, CpuMetrics, NetMetrics, KernelMetrics, LoadAvgMetrics, MemMetrics, DiskIOMetrics, IOStatsMetrics, NetstatMetrics, ProcMetrics, UdpMetrics, }, Interval: interval, }, // 容易出问题 FuncsAndInterval{ Fs: []func() []*model.MetricValue{ DeviceMetrics, }, Interval: interval, }, // 调用相同指令 FuncsAndInterval{ Fs: []func() []*model.MetricValue{ PortMetrics, SocketStatSummaryMetrics, }, Interval: interval, }, FuncsAndInterval{ Fs: []func() []*model.MetricValue{ DuMetrics, }, Interval: interval, }, FuncsAndInterval{ Fs: []func() []*model.MetricValue{ UrlMetrics, }, Interval: interval, }, } }
Colleet:配置信息读取,读取Mapper中的FuncsAndInterval,根据func调用采集函数,采集全部信息(并不是先过滤采集项),从全部采集到的数据中过滤ignore的项,并上报到transfer。
func Collect() { // 配置信息判断 if !g.Config().Transfer.Enabled { return } if len(g.Config().Transfer.Addrs) == 0 { return } // 读取mapper中的FuncsAndInterval集,并经过不一样的goroutine运行 for _, v := range funcs.Mappers { go collect(int64(v.Interval), v.Fs) } } // 间隔采集信息 func collect(sec int64, fns []func() []*model.MetricValue) { // 启动断续器,间隔执行 t := time.NewTicker(time.Second * time.Duration(sec)).C for { <-t hostname, err := g.Hostname() if err != nil { continue } mvs := []*model.MetricValue{} // 读取忽略metric名单 ignoreMetrics := g.Config().IgnoreMetrics // 从funcs的list中取出每一个采集函数 for _, fn := range fns { // 执行采集函数 items := fn() if items == nil { continue } if len(items) == 0 { continue } // 读取采集数据,根据忽略的metric忽略部分采集数据 for _, mv := range items { if b, ok := ignoreMetrics[mv.Metric]; ok && b { continue } else { mvs = append(mvs, mv) } } } // 获取上报时间 now := time.Now().Unix() // 设置上报采集项的间隔、agent主机、上报时间 for j := 0; j < len(mvs); j++ { mvs[j].Step = sec mvs[j].Endpoint = hostname mvs[j].Timestamp = now } // 调用transfer发送采集数据 g.SendToTransfer(mvs) } }
采集信息结构
type MetricValue struct { Endpoint string // 主机名 Metric string // 信息标识cpu.idle、mem.memtotal等 Value interface{} // 采集结果 Step int64 // 该项上报间隔 Type string // GAUGE或COUNTER Tags string // 配置报警策略 Timestamp int64 // 这次上报时间 }
采集信息组成metricValue结构
func NewMetricValue(metric string, val interface{}, dataType string, tags ...string) *model.MetricValue { mv := model.MetricValue{ Metric: metric, Value: val, Type: dataType, } size := len(tags) if size > 0 { mv.Tags = strings.Join(tags, ",") } return &mv } // 原值类型 func GaugeValue(metric string, val interface{}, tags ...string) *model.MetricValue { return NewMetricValue(metric, val, "GAUGE", tags...) } // 计数器类型 func CounterValue(metric string, val interface{}, tags ...string) *model.MetricValue { return NewMetricValue(metric, val, "COUNTER", tags...) }
rpc组件
// 简单封装rpc.Cilent type SingleConnRpcClient struct { sync.Mutex rpcClient *rpc.Client RpcServer string Timeout time.Duration } // 关闭rpc func (this *SingleConnRpcClient) close() { if this.rpcClient != nil { this.rpcClient.Close() this.rpcClient = nil } } // 保证rpc存在,为空则从新建立, 若是server宕机, 死循环???? func (this *SingleConnRpcClient) insureConn() { if this.rpcClient != nil { return } var err error var retry int = 1 for { if this.rpcClient != nil { return } // 根据timeout和server地址去链接rpc的server this.rpcClient, err = net.JsonRpcClient("tcp", this.RpcServer, this.Timeout) if err == nil { return } log.Printf("dial %s fail: %v", this.RpcServer, err) if retry > 6 { retry = 1 } time.Sleep(time.Duration(math.Pow(2.0, float64(retry))) * time.Second) retry++ } } // rpc client调用hbs函数 func (this *SingleConnRpcClient) Call(method string, args interface{}, reply interface{}) error { // 加锁保证一个agent只与server有一个链接,保证性能 this.Lock() defer this.Unlock() // 保证rpc链接可用 this.insureConn() timeout := time.Duration(50 * time.Second) done := make(chan error) go func() { err := this.rpcClient.Call(method, args, reply) done <- err }() // 超时控制 select { case <-time.After(timeout): log.Printf("[WARN] rpc call timeout %v => %v", this.rpcClient, this.RpcServer) this.close() case err := <-done: if err != nil { this.close() return err } } return nil }
Transfer部件
// 定义transfer的rpcClient对应Map, transferClients读写锁 var ( TransferClientsLock *sync.RWMutex = new(sync.RWMutex) TransferClients map[string]*SingleConnRpcClient = map[string]*SingleConnRpcClient{} ) // 发送数据到随机的transfer func SendMetrics(metrics []*model.MetricValue, resp *model.TransferResponse) { rand.Seed(time.Now().UnixNano()) // 随机transferClient发送数据,直到发送成功 for _, i := range rand.Perm(len(Config().Transfer.Addrs)) { addr := Config().Transfer.Addrs[i] if _, ok := TransferClients[addr]; !ok { initTransferClient(addr) } if updateMetrics(addr, metrics, resp) { break } } } // 初始化addr对应的transferClient func initTransferClient(addr string) { TransferClientsLock.Lock() defer TransferClientsLock.Unlock() TransferClients[addr] = &SingleConnRpcClient{ RpcServer: addr, Timeout: time.Duration(Config().Transfer.Timeout) * time.Millisecond, } } // 调用rpc接口发送metric func updateMetrics(addr string, metrics []*model.MetricValue, resp *model.TransferResponse) bool { TransferClientsLock.RLock() defer TransferClientsLock.RUnlock() err := TransferClients[addr].Call("Transfer.Update", metrics, resp) if err != nil { log.Println("call Transfer.Update fail", addr, err) return false } return true }
采集插件同步
// 插件信息: 路径、修改时间、运行周期(来自plugin插件) type Plugin struct { FilePath string MTime int64 Cycle int } // 插件map和调度器map var ( Plugins = make(map[string]*Plugin) PluginsWithScheduler = make(map[string]*PluginScheduler) ) // 删除不须要的plugin func DelNoUsePlugins(newPlugins map[string]*Plugin) { for currKey, currPlugin := range Plugins { newPlugin, ok := newPlugins[currKey] if !ok || currPlugin.MTime != newPlugin.MTime { deletePlugin(currKey) } } } // 添加同步时增长的plugin func AddNewPlugins(newPlugins map[string]*Plugin) { for fpath, newPlugin := range newPlugins { // 去除重复插件 if _, ok := Plugins[fpath]; ok && newPlugin.MTime == Plugins[fpath].MTime { continue } // 为新添加的插件新建调度器 Plugins[fpath] = newPlugin sch := NewPluginScheduler(newPlugin) PluginsWithScheduler[fpath] = sch // 启动plugin调度 sch.Schedule() } } func ClearAllPlugins() { for k := range Plugins { deletePlugin(k) } } func deletePlugin(key string) { v, ok := PluginsWithScheduler[key] if ok { // 暂停调度plugin v.Stop() delete(PluginsWithScheduler, key) } delete(Plugins, key) }
插件调度策略
// 持续间隔执行plugin type PluginScheduler struct { Ticker *time.Ticker Plugin *Plugin Quit chan struct{} } // 根据plugin建立新的schedule func NewPluginScheduler(p *Plugin) *PluginScheduler { scheduler := PluginScheduler{Plugin: p} scheduler.Ticker = time.NewTicker(time.Duration(p.Cycle) * time.Second) scheduler.Quit = make(chan struct{}) return &scheduler } // plugin调度,间隔执行PluginRun,除非收到quit消息 func (this *PluginScheduler) Schedule() { go func() { for { select { case <-this.Ticker.C: PluginRun(this.Plugin) case <-this.Quit: this.Ticker.Stop() return } } }() } // 中止plugin调度 func (this *PluginScheduler) Stop() { close(this.Quit) } // 执行插件,读取插件运行返回数据并上报transfer func PluginRun(plugin *Plugin) { timeout := plugin.Cycle*1000 - 500 fpath := filepath.Join(g.Config().Plugin.Dir, plugin.FilePath) if !file.IsExist(fpath) { log.Println("no such plugin:", fpath) return } debug := g.Config().Debug if debug { log.Println(fpath, "running...") } cmd := exec.Command(fpath) var stdout bytes.Buffer cmd.Stdout = &stdout var stderr bytes.Buffer cmd.Stderr = &stderr cmd.Start() err, isTimeout := sys.CmdRunWithTimeout(cmd, time.Duration(timeout)*time.Millisecond) errStr := stderr.String() if errStr != "" { logFile := filepath.Join(g.Config().Plugin.LogDir, plugin.FilePath+".stderr.log") if _, err = file.WriteString(logFile, errStr); err != nil { log.Printf("[ERROR] write log to %s fail, error: %s\n", logFile, err) } } if isTimeout { // has be killed if err == nil && debug { log.Println("[INFO] timeout and kill process", fpath, "successfully") } if err != nil { log.Println("[ERROR] kill process", fpath, "occur error:", err) } return } if err != nil { log.Println("[ERROR] exec plugin", fpath, "fail. error:", err) return } // exec successfully data := stdout.Bytes() if len(data) == 0 { if debug { log.Println("[DEBUG] stdout of", fpath, "is blank") } return } var metrics []*model.MetricValue err = json.Unmarshal(data, &metrics) if err != nil { log.Printf("[ERROR] json.Unmarshal stdout of %s fail. error:%s stdout: \n%s\n", fpath, err, stdout.String()) return } g.SendToTransfer(metrics) }