InfluxDB源码编译、安装、配置及主从同步实现

直接使用官方包进行安装很简单,查看官方说明便可。安装以后使用才发现,开源的只支持单机版的,可是公司用不能这么low吧,怎么也要“高可用”一点,因而本身参考MySQL主从复制原理、半同步操做步骤及原理饿了么 Influxdb 实践之路,作了InfluxDB主从系统。java


客户端系统

客户端系统拓扑图 linux

blob.png

这个系统主要是用来从kafka获取数据源,通过处理以后存到InfluxDB中去,这里参考了「饿了么」那篇文章,可是我看过他们的源码,基本上搞不懂,就根据文章的描述搞了一个简陋版的东西吧,这里涉及到kafka和influxdb-java,项目源代码能够看这里。该项目启动运行参考其中的README。git


InfluxDB主从同步系统

InfluxDB同步系统 
github

blob.png

主从同步架构,是简陋版的MySQL主从同步。脚本读取InfluxDB的增删改操做日志,使用脚本将记录写入从机中。通过测试,运行情况良好,只要主从机不挂,同步系统能够健康运行。golang

同步脚本使用Python编写,项目源代码在这里。该项目启动运行参考其中的README。数据库

为何使用Python脚原本编写主从同步代码?json

  • 主要是考虑到下降与数据库的代码耦合程度,InfluxDB源码若是出现大规模的升级改动,同步脚本只需略做修改就能够了。segmentfault

  • 毕竟Python是世界上**的语言。?服务器


InfluxDB源码修改

  • 下载源码架构

    在github上下载InfluxDB源码(这里使用的是个人fork地址,已经修改好的源码)。

  • 修改源码

    因为脚本须要读取InfluxDB的增删改操做日志,须要对源码中这一部分操做的日志进行修改,方便脚本解析日志。官方版的日志不记录写入操做的数据值,因此须要咱们本身修改源码中对应的记录写日志的地方。

    修改文件influxdb/services/httpd/handler.go

    // 记录日志的具体方法
    func buildLogLine(l *responseLogger, r *http.Request, start time.Time, body string) string {
    
        redactPassword(r)
    
        username := parseUsername(r)
    
        host, _, err := net.SplitHostPort(r.RemoteAddr)
        if err != nil {
            host = r.RemoteAddr
        }
    
        if xff := r.Header["X-Forwarded-For"]; xff != nil {
            addrs := append(xff, host)
            host = strings.Join(addrs, ",")
        }
    
        uri := r.URL.RequestURI()
    
        referer := r.Referer()
    
        userAgent := r.UserAgent()
    
        // 新增请求中的请求路径
        path := r.URL.Path
    
        // 新增请求中的form值
        r.ParseForm()
        form := r.Form
    
        // 新增请求中的body的值
        newbody := strings.Replace(body, "\n", ";", -1)
    
        // 将日志记录变为json格式
        return fmt.Sprintf(`{"timeindex":%d,"host":"%s","username":"%s","method":"%s","path":"%s","uri":"%s","form":"%s","body":"%s","proto":"%s","status":"%s","size":"%s","referer":"%s","agent":"%s","reqId":"%s"}`,
            start.Nanosecond(),
            host,
            detect(username, "-"),
            r.Method,
            path,
            uri,
            form,
            newbody,
            r.Proto,
            detect(strconv.Itoa(l.Status()), "-"),
            strconv.Itoa(l.Size()),
            detect(referer, "-"),
            detect(userAgent, "-"),
            r.Header.Get("Request-Id"))
    }

    修改文件influxdb/services/httpd/handler.go其中两处调用上面修改过的函数buildLogLine的地方

    func (h *Handler) logging(inner http.Handler, name string) http.Handler {
        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
            start := time.Now()
            l := &responseLogger{w: w}
            inner.ServeHTTP(l, r)
            // 增长请求中的body
            h.CLFLogger.Println(buildLogLine(l, r, start, h.body))
    
            // Log server errors.
            if l.Status()/100 == 5 {
                errStr := l.Header().Get("X-InfluxDB-Error")
                if errStr != "" {
                    h.Logger.Error(fmt.Sprintf("[%d] - %q", l.Status(), errStr))
                }
            }
        })
    }
    func (h *Handler) recovery(inner http.Handler, name string) http.Handler {
        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
            start := time.Now()
            l := &responseLogger{w: w}
    
            defer func() {
                if err := recover(); err != nil {
                    // 增长请求中的body
                    logLine := buildLogLine(l, r, start, h.body)
                    logLine = fmt.Sprintf("%s [panic:%s] %s", logLine, err, debug.Stack())
                    h.CLFLogger.Println(logLine)
                    http.Error(w, http.StatusText(http.StatusInternalServerError), 500)
                    atomic.AddInt64(&h.stats.RecoveredPanics, 1) // Capture the panic in _internal stats.
    
                    if willCrash {
                        h.CLFLogger.Println("\n\n=====\nAll goroutines now follow:")
                        buf := debug.Stack()
                        h.CLFLogger.Printf("%s\n", buf)
                        os.Exit(1) // If we panic then the Go server will recover.
                    }
                }
            }()
    
            inner.ServeHTTP(l, r)
        })
    }

    到此InfluxDB源码修改完成。

    为何要在func buildLogLine(l *responseLogger, r *http.Request, start time.Time, body string) string方法上增长一个字段body,而body明明是从h *Handler中获取的,而函数buildLogLine中明明是有r *http.Request的?

    由于通过测试,在func buildLogLine中从r *http.Request里面实际上取不到body的值,我没有深刻研究过这段代码,确实不懂为何会这样。因此才从上层将body的值直接传入这个方法中。


编译源码和运行

使用源码构建时序数据库主从同步系统

系统要求

Linux 64位系统便可。

依赖环境

本系统使用InfluxDB v1.5.2,因此须要Go 1.9.2或者以上的版本。

InfluxDB使用Dep管理依赖包,须要安装dep

主从同步脚本使用Python 2.7开发。

须要从Github上拉取源码,环境中须要安装git

安装InfluxDB

新建目录$YOUR_PATH/gocodez/src$YOUR_PATH/gocodez/src$YOUR_PATH为自定义目录:

mkdir -p $YOUR_PATH/gocodez/srcmkdir -p $YOUR_PATH/gocodez/bin12

将×××解压到目录$YOUR_PATH/gocodez/src中:

cd $YOUR_PATH/gocodez/src/
git clone https://github.com/callELPSYCONGROO/influxdb.git12

将目录$YOUR_PATH/设置为GOPATH

export GOPATH=$YOUR_PATH/1

进入目录中:

cd $YOUR_PATH/gocodez/src/influxdb1

若是正确安装了dep,这使用这个命令安装依赖:

dep ensure1

安装依赖时,若是遇到没法链接到依赖所需的服务器,须要查看所缺依赖,手动将这些依赖×××到$YOUR_PATH/src/对应路径下。所需依赖在Github上均有源码能够下载。

若是安装或使用dep不成功,能够跳过此步,在下一步时根据错误提示,手动安装所需依赖。

安装依赖可使用依赖包,将依赖包的/src解压到$YOUR_PATH/gocodez目录下便可。

构建安装二进制执行码:

go clean ./...go install ./...12

安装完成后,二进制执行码放在$YOUR_PATH/gocodez/bin/中,启动时序数据库:

$YOUR_PATH/gocodez/bin/influxd1

安装主从同步脚本

新建目录$YOUR_SCRIPT$YOUR_SCRIPT为自定义脚本目录:

mkdir $YOUR_SCRIPT1

从Github中拉取脚本:

cd $YOUR_SCRIPTgit clone https://github.com/callELPSYCONGROO/master_slave12

参考其中的README.md完成脚本配置和运行。

相关文章
相关标签/搜索