欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

InfluxDB源码编译、安装、配置及主从同步实现

程序员文章站 2022-04-09 20:34:35
...

先扯点蛋

公司有个项目要求使用InfluxDB时序数据库储存点东西。第一次听说还有这种数据库,哈哈哈,孤陋寡闻了,先从各位大佬的博客看起,慢慢学习,逐渐了解了之后在服务器上进行安装。直接使用官方包进行安装很简单,查看官方说明即可。安装之后使用才发现,开源的只支持单机版的,但是公司用不能这么low吧,怎么也要“高可用”一点,于是自己参考MySQL主从复制原理、半同步操作步骤及原理饿了么 Influxdb 实践之路,做了InfluxDB主从系统。


客户端系统

客户端系统拓扑图
InfluxDB源码编译、安装、配置及主从同步实现

这个系统主要是用来从kafka获取数据源,经过处理之后存到InfluxDB中去,这里参考了「饿了么」那篇文章,但是我看过他们的源码,基本上搞不懂,就根据文章的描述搞了一个简陋版的东西吧,这里涉及到kafka和influxdb-java,项目源代码可以看这里。该项目启动运行参考其中的README。


InfluxDB主从同步系统

InfluxDB同步系统
InfluxDB源码编译、安装、配置及主从同步实现

主从同步架构,是简陋版的MySQL主从同步。脚本读取InfluxDB的增删改操作日志,使用脚本将记录写入从机中。经过测试,运行状况良好,只要主从机不挂,同步系统可以健康运行。

同步脚本使用Python编写,项目源代码在这里。该项目启动运行参考其中的README。

为什么使用Python脚本来编写主从同步代码?

  • 主要是考虑到降低与数据库的代码耦合程度,InfluxDB源码如果出现大规模的升级改动,同步脚本只需略作修改就可以了。
  • 毕竟Python是世界上**的语言。?

InfluxDB源码修改

  • 下载源码

    在github上下载InfluxDB源码(这里使用的是我的fork地址,已经修改好的源码)。

  • 修改源码

    由于脚本需要读取InfluxDB的增删改操作日志,需要对源码中这一部分操作的日志进行修改,方便脚本解析日志。官方版的日志不记录写入操作的数据值,所以需要我们自己修改源码中对应的记录写日志的地方。

    修改文件influxdb/services/httpd/handler.go

    // 记录日志的具体方法
    func buildLogLine(l *responseLogger, r *http.Request, start time.Time, body string) string {
    
        redactPassword(r)
    
        username := parseUsername(r)
    
        host, _, err := net.SplitHostPort(r.RemoteAddr)
        if err != nil {
            host = r.RemoteAddr
        }
    
        if xff := r.Header["X-Forwarded-For"]; xff != nil {
            addrs := append(xff, host)
            host = strings.Join(addrs, ",")
        }
    
        uri := r.URL.RequestURI()
    
        referer := r.Referer()
    
        userAgent := r.UserAgent()
    
        // 新增请求中的请求路径
        path := r.URL.Path
    
        // 新增请求中的form值
        r.ParseForm()
        form := r.Form
    
        // 新增请求中的body的值
        newbody := strings.Replace(body, "\n", ";", -1)
    
        // 将日志记录变为json格式
        return fmt.Sprintf(`{"timeindex":%d,"host":"%s","username":"%s","method":"%s","path":"%s","uri":"%s","form":"%s","body":"%s","proto":"%s","status":"%s","size":"%s","referer":"%s","agent":"%s","reqId":"%s"}`,
            start.Nanosecond(),
            host,
            detect(username, "-"),
            r.Method,
            path,
            uri,
            form,
            newbody,
            r.Proto,
            detect(strconv.Itoa(l.Status()), "-"),
            strconv.Itoa(l.Size()),
            detect(referer, "-"),
            detect(userAgent, "-"),
            r.Header.Get("Request-Id"))
    }

    修改文件influxdb/services/httpd/handler.go其中两处调用上面修改过的函数buildLogLine的地方

    func (h *Handler) logging(inner http.Handler, name string) http.Handler {
        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
            start := time.Now()
            l := &responseLogger{w: w}
            inner.ServeHTTP(l, r)
            // 增加请求中的body
            h.CLFLogger.Println(buildLogLine(l, r, start, h.body))
    
            // Log server errors.
            if l.Status()/100 == 5 {
                errStr := l.Header().Get("X-InfluxDB-Error")
                if errStr != "" {
                    h.Logger.Error(fmt.Sprintf("[%d] - %q", l.Status(), errStr))
                }
            }
        })
    }
    func (h *Handler) recovery(inner http.Handler, name string) http.Handler {
        return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
            start := time.Now()
            l := &responseLogger{w: w}
    
            defer func() {
                if err := recover(); err != nil {
                    // 增加请求中的body
                    logLine := buildLogLine(l, r, start, h.body)
                    logLine = fmt.Sprintf("%s [panic:%s] %s", logLine, err, debug.Stack())
                    h.CLFLogger.Println(logLine)
                    http.Error(w, http.StatusText(http.StatusInternalServerError), 500)
                    atomic.AddInt64(&h.stats.RecoveredPanics, 1) // Capture the panic in _internal stats.
    
                    if willCrash {
                        h.CLFLogger.Println("\n\n=====\nAll goroutines now follow:")
                        buf := debug.Stack()
                        h.CLFLogger.Printf("%s\n", buf)
                        os.Exit(1) // If we panic then the Go server will recover.
                    }
                }
            }()
    
            inner.ServeHTTP(l, r)
        })
    }

    到此InfluxDB源码修改完成。

    为什么要在func buildLogLine(l *responseLogger, r *http.Request, start time.Time, body string) string方法上增加一个字段body,而body明明是从h *Handler中获取的,而函数buildLogLine中明明是有r *http.Request的?

    因为经过测试,在func buildLogLine中从r *http.Request里面实际上取不到body的值,我没有深入研究过这段代码,确实不懂为什么会这样。所以才从上层将body的值直接传入这个方法中。

编译源码和运行

使用源码构建时序数据库主从同步系统

系统要求

Linux 64位系统即可。

依赖环境

本系统使用InfluxDB v1.5.2,所以需要Go 1.9.2或者以上的版本。

InfluxDB使用Dep管理依赖包,需要安装dep

主从同步脚本使用Python 2.7开发。

需要从Github上拉取源码,环境中需要安装git

安装InfluxDB

新建目录$YOUR_PATH/gocodez/src$YOUR_PATH/gocodez/src$YOUR_PATH为自定义目录:

mkdir -p $YOUR_PATH/gocodez/src
mkdir -p $YOUR_PATH/gocodez/bin

将源码下载解压到目录$YOUR_PATH/gocodez/src中:

cd $YOUR_PATH/gocodez/src/
git clone https://github.com/callELPSYCONGROO/influxdb.git

将目录$YOUR_PATH/设置为GOPATH

export GOPATH=$YOUR_PATH/

进入目录中:

cd $YOUR_PATH/gocodez/src/influxdb

如果正确安装了dep,这使用这个命令安装依赖:

dep ensure

安装依赖时,如果遇到无法连接到依赖所需的服务器,需要查看所缺依赖,手动将这些依赖源码下载到$YOUR_PATH/src/对应路径下。所需依赖在Github上均有源码可以下载。

如果安装或使用dep不成功,可以跳过此步,在下一步时根据错误提示,手动安装所需依赖。

安装依赖可以使用依赖包,将依赖包的/src解压到$YOUR_PATH/gocodez目录下即可。

构建安装二进制执行码:

go clean ./...
go install ./...

安装完成后,二进制执行码放在$YOUR_PATH/gocodez/bin/中,启动时序数据库:

$YOUR_PATH/gocodez/bin/influxd

安装主从同步脚本

新建目录$YOUR_SCRIPT$YOUR_SCRIPT为自定义脚本目录:

mkdir $YOUR_SCRIPT

从Github中拉取脚本:

cd $YOUR_SCRIPT
git clone https://github.com/callELPSYCONGROO/master_slave

参考其中的README.md完成脚本配置和运行。