influxDB+grafana 日志监控平台(Golang)

influxdb

InfluxDB 是一个开源分布式时序、事件和指标数据库。使用 Go 语言编写,无需外部依赖。其设计目标是实现分布式和水平伸缩扩展。
html


influxDB启动流程:linux


 1  用docker下拉influxdb的镜像 nginx

docker pull tutum/influxdb docekrgit


2 Docker环境下运行influxdbgithub

docker run -d -p 8083:8083 -p8086:8086 --expose 8090 --expose 8099 --name influxsrv tutum/influxdb
web

各个参数含义:正则表达式

-d:容器在后台运行
docker

-p:将容器内端口映射到宿主机端口,格式为 宿主机端口:容器内端口;数据库

8083是influxdb的web管理工具端口json

8086是influxdb的HTTP API端口

--expose:能够让容器接受外部传入的数据

--name:容器名称 最后是镜像名称+tag,镜像为tutum/influxdb,tag的值0.8.8指定了要运行的版本,默认是latest。


3 启动influxdb后,influxdb会启动一个内部的HTTP server管理工具,用户能够经过接入该web服务器来操做influxdb。

固然,也能够经过CLI即命令行的方式访问influxdb。

打开浏览器,输入http://127.0.0.1:8083,访问管理工具的主页


4 Influxdb客户端 能够参考里面例子

https://github.com/influxdata/influxdb/tree/master/client

PS. Influxdb原理详解

https://www.linuxdaxue.com/influxdb-principle.html



Grafana

Grafana 是一个开源的时序性统计和监控平台,支持例如 elasticsearch、graphite、influxdb 等众多的数据源,并以功能强大的界面编辑器著称。

官网:https://grafana.com/


grafana启动流程:

1 docker 拉取镜像

docker run -d --name=grafana -p 3000:3000 grafana/grafana


2 访问管理工具的主页

浏览器127.0.0.1:3000 ,  登陆 grafana的默认端口是3000,用户名和密码为 admin / admin,配置文件/etc/grafana/grafana.ini,更改配置文件后须要重启grafana。


3. 建立数据库,绑定influxdb


4. 建立一个新的面板

home —> New Dashboard —> Graph —> 点击,Edit


5 Edit中的Metrics就是构造一个SQL的查询语句



Golang打点

监控日志程序经过 influxdb 将须要的内容打点到influxdb 

1.导入 github.com/influxdata/influxdb/client/v2


2.建立influxdb client

// Create a new HTTPClient
	c, err := client.NewHTTPClient(client.HTTPConfig{
		Addr:     addr,
		Username: username,
		Password: password,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer c.Close()复制代码


3.建立须要打的点的格式,类型

// Create a new point batch
		bp, err := client.NewBatchPoints(client.BatchPointsConfig{
			Database:  database,
			Precision: precision,
		})
		if err != nil {
			log.Fatal(err)
		}

		复制代码


4.建立点,将点添加进influxdb数据库

// Create a point and add to batch
		//Tags:Path,Method,Scheme,Status
		tags := map[string]string{
			"Path": v.Path,
			"Method": v.Method,
			"Scheme": v.Scheme,
			"Status": v.Status,
			}

		fields := map[string]interface{}{
			"UpstreamTime": v.UpstreamTime,
			"RequestTime":  v.RequestTime,
			"BytesSent":    v.BytesSent,
		}

pt, err := client.NewPoint("nginx_log", tags, fields, v.TimeLocal)
		if err != nil {
			log.Fatal(err)
		}
		bp.AddPoint(pt)

		// Write the batch
		if err := c.Write(bp); err != nil {
			log.Fatal(err)
		}复制代码

Golang 完整代码

imooc.log日志格式以下:

172.0.0.12 - - [02/May/2018:17:17:35 +0000] http "GET /foo?query=t HTTP/1.0" 200 2133 "-" "KeepAliveClient" "-" 1.005 1.854

172.0.0.12 - - [02/May/2018:17:17:36 +0000] http "POST /bar?query=t HTTP/1.0" 300 2133 "-" "KeepAliveClient" "-" 1.025 1.854


代码逻辑主要是  经过读取模块读取imooc.log日志文件中日志,而后经过正则表达式,一行一行解析获取数据,并经过写入模块将数据经过influxdb客户端打点,最后经过grafana去显示数据图形.


package main

import (
	"bufio"
	"fmt"
	"github.com/influxdata/influxdb/client/v2"
	"io"
	"net/url"
	"os"
	"regexp"
	"strconv"
	"strings"
	"time"

	"flag"
	"log"
	"net/http"
	"encoding/json"
)

const (
	TypeHandleLine = 0
	TypeErrNum = 1
	TpsIntervalTime = 5
)

var TypeMonitorChan = make(chan int,200)

type Message struct {
	TimeLocal                    time.Time
	BytesSent                    int
	Path, Method, Scheme, Status string
	UpstreamTime, RequestTime    float64
}

//系统状态监控
type SystemInfo struct {
	HandleLine    int     `json:"handleLine"`   //总处理日志行数
	Tps           float64 `json:"tps"`          //系统吞吐量
	ReadChanLen   int     `json:"readChanLen"`  //read channel 长度
	WriterChanLen int     `json:"writeChanLen"` //write channel 长度
	RunTime       string  `json:"ruanTime"`     //运行总时间
	ErrNum        int     `json:"errNum"`       //错误数
}

type Monitor struct {
	startTime time.Time
	data SystemInfo
	tpsSli []int
	tps float64
}

func (m *Monitor)start(lp *LogProcess)  {

	go func() {
		for n := range TypeMonitorChan  {
			switch n {
			case TypeErrNum:
				m.data.ErrNum += 1

			case TypeHandleLine:
				m.data.HandleLine += 1
			}
		}
	}()


	ticker := time.NewTicker(time.Second *TpsIntervalTime)
	go func() {
		for {
			<-ticker.C
			m.tpsSli = append(m.tpsSli,m.data.HandleLine)
			if len(m.tpsSli) > 2 {
				m.tpsSli = m.tpsSli[1:]
				m.tps =  float64(m.tpsSli[1] - m.tpsSli[0])/TpsIntervalTime
			}
		}
	}()


	http.HandleFunc("/monitor", func(writer http.ResponseWriter, request *http.Request) {
		m.data.RunTime = time.Now().Sub(m.startTime).String()
		m.data.ReadChanLen = len(lp.rc)
		m.data.WriterChanLen = len(lp.wc)
		m.data.Tps = m.tps

		ret ,_ := json.MarshalIndent(m.data,"","\t")
		io.WriteString(writer,string(ret))
	})


	http.ListenAndServe(":9193",nil)
}


type Reader interface {
	Read(rc chan []byte)
}

type Writer interface {
	Writer(wc chan *Message)
}

type LogProcess struct {
	rc    chan []byte
	wc    chan *Message
	read  Reader
	write Writer
}

type ReadFromFile struct {
	path string //读取文件的路径
}

//读取模块
func (r *ReadFromFile) Read(rc chan []byte) {

	//打开文件
	f, err := os.Open(r.path)
	fmt.Println(r.path)
	if err != nil {
		panic(fmt.Sprintf("open file err :", err.Error()))
	}

	//从文件末尾开始逐行读取文件内容
	f.Seek(0, 2) //2,表明将指正移动到末尾

	rd := bufio.NewReader(f)

	for {
		line, err := rd.ReadBytes('\n') //连续读取内容知道须要'\n'结束
		if err == io.EOF {
			time.Sleep(5000 * time.Microsecond)
			continue
		} else if err != nil {
			panic(fmt.Sprintf("ReadBytes err :", err.Error()))
		}

		TypeMonitorChan <- TypeHandleLine
		rc <- line[:len(line)-1]
	}

}

type WriteToinfluxDB struct {
	influxDBDsn string //influx data source
}

//写入模块
/**
    1.初始化influxdb client
	2. 从Write Channel中读取监控数据
	3. 构造数据并写入influxdb
*/
func (w *WriteToinfluxDB) Writer(wc chan *Message) {

	infSli := strings.Split(w.influxDBDsn, "@")
	addr := infSli[0]
	username := infSli[1]
	password := infSli[2]
	database := infSli[3]
	precision := infSli[4]

	// Create a new HTTPClient
	c, err := client.NewHTTPClient(client.HTTPConfig{
		Addr:     addr,
		Username: username,
		Password: password,
	})
	if err != nil {
		log.Fatal(err)
	}
	defer c.Close()

	for v := range wc {
		// Create a new point batch
		bp, err := client.NewBatchPoints(client.BatchPointsConfig{
			Database:  database,
			Precision: precision,
		})
		if err != nil {
			log.Fatal(err)
		}

		// Create a point and add to batch
		//Tags:Path,Method,Scheme,Status
		tags := map[string]string{
			"Path": v.Path,
			"Method": v.Method,
			"Scheme": v.Scheme,
			"Status": v.Status,
			}

		fields := map[string]interface{}{
			"UpstreamTime": v.UpstreamTime,
			"RequestTime":  v.RequestTime,
			"BytesSent":    v.BytesSent,
		}

		fmt.Println("taps:",tags)
		fmt.Println("fields:",fields)

		pt, err := client.NewPoint("nginx_log", tags, fields, v.TimeLocal)
		if err != nil {
			log.Fatal(err)
		}
		bp.AddPoint(pt)

		// Write the batch
		if err := c.Write(bp); err != nil {
			log.Fatal(err)
		}

		// Close client resources
		if err := c.Close(); err != nil {
			log.Fatal(err)
		}

		log.Println("write success")
	}

}

//解析模块
func (l *LogProcess) Process() {

	/**
	172.0.012 - - [04/Mar/2018:13:49:52 +0000] http "GET /foo?query=t HTTP/1.0" 200 2133 "-"
	"KeepAliveClient" "-" 1.005 1.854

	([\d\.]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+([a-z]+)\s+\"([^"]+)\"\s+(\d{3})\s+(\d+)\s+\"([^"]+)\"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([\d\.-]+) */ r := regexp.MustCompile(`([\d\.]+)\s+([^ \[]+)\s+([^ \[]+)\s+\[([^\]]+)\]\s+([a-z]+)\s+\"([^"]+)\"\s+(\d{3})\s+(\d+)\s+\"([^"]+)\"\s+\"(.*?)\"\s+\"([\d\.-]+)\"\s+([\d\.-]+)\s+([\d\.-]+)`) for v := range l.rc { ret := r.FindStringSubmatch(string(v)) if len(ret) != 14 { TypeMonitorChan <- TypeErrNum fmt.Println("FindStringSubmatch fail:", string(v)) fmt.Println(len(ret)) continue } message := &Message{} //时间: [04/Mar/2018:13:49:52 +0000] loc, _ := time.LoadLocation("Asia/Shanghai") t, err := time.ParseInLocation("02/Jan/2006:15:04:05 +0000", ret[4], loc) if err != nil { TypeMonitorChan <- TypeErrNum fmt.Println("ParseInLocation fail:", err.Error(), ret[4]) } message.TimeLocal = t //字符串长度: 2133 byteSent, _ := strconv.Atoi(ret[8]) message.BytesSent = byteSent //"GET /foo?query=t HTTP/1.0" reqSli := strings.Split(ret[6], " ") if len(reqSli) != 3 { TypeMonitorChan <- TypeErrNum fmt.Println("strings.Split fail:", ret[6]) continue } message.Method = reqSli[0] u, err := url.Parse(reqSli[1]) if err != nil { TypeMonitorChan <- TypeErrNum fmt.Println("url parse fail:", err) continue } message.Path = u.Path //http message.Scheme = ret[5] //code: 200 message.Status = ret[7] //1.005 upstreamTime, _ := strconv.ParseFloat(ret[12], 64) message.UpstreamTime = upstreamTime //1.854 requestTime, _ := strconv.ParseFloat(ret[13], 64) message.RequestTime = requestTime //fmt.Println(message) l.wc <- message } } /** 分析监控需求: 某个协议下的某个请求在某个请求方法的 QPS&响应时间&流量 */ func main() { var path, influDsn string flag.StringVar(&path, "path", "./imooc.log", "read file path") flag.StringVar(&influDsn, "influxDsn", "http://127.0.01:8086@imooc@imoocpass@imooc@s", "influx data source") flag.Parse() r := &ReadFromFile{ path: path, } w := &WriteToinfluxDB{ influxDBDsn: influDsn, } lp := &LogProcess{ rc: make(chan []byte,200), wc: make(chan *Message), read: r, write: w, } go lp.read.Read(lp.rc) for i:=1;i<2 ; i++ { go lp.Process() } for i:=1;i<4 ; i++ { go lp.write.Writer(lp.wc) } fmt.Println("begin !!!") m:= &Monitor{ startTime:time.Now(), data:SystemInfo{}, } m.start(lp) } 复制代码
相关文章
相关标签/搜索