欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

go 源码学习之---Tail 源码分析

程序员文章站 2022-04-16 09:02:30
已经有两个月没有写博客了,也有好几个月没有看go相关的内容了,由于工作原因最近在做java以及大数据相关的内容,导致最近工作较忙,博客停止了更新,正好想捡起之前go的东西,所以找了一个源码学习 这个也是之前用go写日志收集的时候用到的一个包 :github.com/hpcloud/tail, 这次就 ......

已经有两个月没有写博客了,也有好几个月没有看go相关的内容了,由于工作原因最近在做java以及大数据相关的内容,导致最近工作较忙,博客停止了更新,正好想捡起之前go的东西,所以找了一个源码学习

这个也是之前用go写日志收集的时候用到的一个包 :github.com/hpcloud/tail, 这次就学习一下人家的源码,为了方便看这个代码,我将这个包进行了简化,也是用于方便理解,代码放到了:https://github.com/pythonsite/tail, 这个代码包可能无法正常用,只是为了方面理解tail这个包,以及学习人家的代码

精简后的代码目录

│  tail.go
│
└─watch
        filechanges.go
        inotify.go
        inotify_tracker.go
        watch.go

tail.go: 这里包含着tail包的核心代码,主要的逻辑处理时在这个里面

watch: 这个包主要用于对文件的监控,用于将文件的变化通知到tail.如:文件修改了,文件删除了,文件内容追加了

 

tail.go 代码分析

在tail.go中主要有几下几个结构体:

// line 结构体用于存读每行的时候的对象
type line struct {
    text string            //当前行的内容
    time time.time        // 时间
    err  error             // error from tail
}

type seekinfo struct {
    offset         int64
    whence        int    
}

// 关于配置的结构体
type config struct {
    location     *seekinfo
    reopen        bool
    mustexist    bool        // 要打开的文件是否必须存在
    poll        bool

    pipe        bool

    follow        bool        // 是否继续读取新的一行,可以理解为tail -f 命令
    
}

// 核心的结构体tail
type tail struct {
    filename     string                    // 要打开的文件名
    lines        chan *line                // 用于存每行内容的line结构体

    config

    watcher     watch.filewatcher
    changes     *watch.filechanges

    tomb.tomb    

    file         *os.file
    reader         *bufio.reader
    lk sync.mutex
}
line 结构体用于存读取文件的每行内容
tail 是核心的结构体,我们使用tail这个包的时候其实就是会先调用初始化这个struct的方法tailfile,如我在写日志收集的时候的使用:
    tail,err := tail.tailfile(conf.logpath,tail.config{
        reopen:true,
        follow:true,
        location:&tail.seekinfo{offset:0,whence:2},
        mustexist:false,
        poll:true,
    })

既然我们使用的时候就会在最开始的时候调用tail.tailfile方法,就直接看这个方法:

// 主要用于tail结构体的初始化
func tailfile(filename string, config config) (*tail, error) {
    t := &tail {
        filename:    filename,
        lines:        make(chan *line),
        config:        config,
    }
    t.watcher = watch.newinotifyfilewatcher(filename)
    if t.mustexist {
        var err error
        t.file, err = openfile(t.filename)
        if err != nil {
            return nil, err
        }
    }
    go t.tailfilesync()

    return t, nil
}

从这个代码里我们就可以看到它首先初始化了tail结构体并且对tail中的watcher进行的复制,先暂时不看watch相关的内容

然后就是关于文件是否必须存在的判断处理,最后开启了一个一个线程执行tailfilesync()方法,我们接着看tailfilesync方法

func (tail *tail) tailfilesync(){
    defer tail.done()
    defer tail.close()

    if !tail.mustexist {
        err := tail.reopen()
        if err != nil {
            if err != tomb.errdying {
                tail.kill(err)
            }
            return
        }
    }

    tail.openreader()

    var offset int64
    var err error
    // 一行行读文件内容
    for {

        if !tail.pipe {
            offset,err = tail.tell()
            if err != nil {
                tail.kill(err)
                return
            }
        }

        line, err := tail.readline()
        if err == nil {
            // 将读取的一行内容放到chan中
            tail.sendline(line)
        } else if err == io.eof {
            // 表示读到文件的最后了
            // 如果follow 设置为false的话就不会继续读文件
            if !tail.follow {
                if line != "" {
                    tail.sendline(line)
                }
                return
            }
            // 如果follow设置为true则会继续读
            if tail.follow && line != "" {
                err := tail.seekto(seekinfo{offset: offset, whence: 0})
                if err != nil {
                    tail.kill(err)
                    return
                }
            }
            // 如果读到文件最后,文件并没有新的内容增加
            err := tail.waitforchanges()
            if err != nil {
                if err != errstop {
                    tail.kill(err)
                }
                return
            }


        } else {
            // 既不是文件结尾,也没有error
            tail.killf("error reading %s :%s", tail.filename, err)
            return
        }


        select {
        case <- tail.dying():
            if tail.err() == errstopateof {
                continue
            }
            return
        default:    
        }
    }
}

这个方法里主要是先调用了openreader方法,这个方法其实并没有做什么,只是对tail.reqader进行了赋值:tail.reader = bufio.newreader(tail.file)

接着就是循环一行行的读文件

在循环里最开始判断了tail.pipe的值,这个值一般开始我也并不会设置,所以默认就是false,所以就会执行tail.tell()方法,这个方法主要是用于获取文件当前行的位置信息,下面是tell的代码内容:

// 获取文件当前行的位置信息
func (tail *tail) tell()(offset int64, err error) {
    if tail.file == nil {
        return
    }
    offset, err = tail.file.seek(0, os.seek_cur)
    if err != nil {
        return
    }
    tail.lk.lock()
    defer tail.lk.unlock()
    if tail.reader == nil {
        return
    }
    offset -= int64(tail.reader.buffered())
    return
}

接着会调用tail.readline()方法,这个方法就是用于获取文件的一行内容,同时将一行内容实例化为line对象,然后扔到管道tail.lines中

//将读取的文件的每行内容存入到line结构体中,并最终存入到tail.lines的chan中
func (tail *tail) sendline(line string) bool {
    now := time.now()
    lines := []string{line}

    for _, line := range lines {
        tail.lines <- &line {
            line,
            now,
            nil,
        }
    }
    return true
}

最后的大量if 判断其实主要是针对读到文件末尾后的一些操作,

tail结构体在最后定义的时候有一个参数:follow, 这个参数的目的就是当读到文件最后的时候是否继续读文件, 如果最开始设置了false,那么读到最后之后就不会在读文件了

如果设置为true,那么读到文件最后之后会保存文件的位置信息,并执行waitforchanges() 去等待文件的变化,waitforchanges()代码内容如下:

// 等待文件的变化事件
func (tail *tail) waitforchanges() error {
    if tail.changes == nil {
        // 这里是获取文件指针的当前位置
        pos, err := tail.file.seek(0,os.seek_cur)
        if err != nil {
            return err
        }
        tail.changes, err = tail.watcher.changeevents(&tail.tomb, pos)
        if err != nil {
            return err
        }
    }
    // 和inotify中进行很巧妙的配合,这里通过select 来进行查看那个chan变化了,来知道文件的变化
    select {
    case <- tail.changes.modified:            // 文件被修改
        return nil
    case <- tail.changes.deleted:            // 文件被删除或者移动到其他目录
        tail.changes = nil
        // 如果文件被删除或者被移动到其他目录,则会尝试重新打开文件
        if tail.reopen {
            fmt.printf("re-opening moved/deleted file %s...",tail.filename)
            if err := tail.reopen();err != nil {
                return err
            }
            fmt.printf("successfully reopened %s", tail.filename)
            tail.openreader()
            return nil
        } else {
            fmt.printf("stoping tail as file not longer exists: %s", tail.filename)
            return errstop
        }
    case <- tail.changes.truncated:            // 文件被追加新的内容
        fmt.printf("re-opening truncated file %s....", tail.filename)
        if err := tail.reopen();err != nil {
            return err
        }
        fmt.printf("successfuly reopend truncated %s", tail.filename)
        tail.openreader()
        return nil
    case <- tail.dying():
        return nil
    }
    panic("unreachable")
}

看到这里的时候其实就能感觉到,别人写的代码其实也并不是非常复杂,也是很普通的代码,但是你会觉得人家很多地方用的非常巧妙,

这段代码中主要的是的内容就是select部分,这个部分通过select监控

tail.changes.modified
tail.changes.deleted
tail.changes.truncated

从而知道文件的变化,是修改了,还是删除了,还是追加内容了,这几个其实都是一个channel,这几个channel中的内容是怎么放进去的呢,接下来看watch包中的内容

watch包代码分析

首先先看一下watch包中的watch.go,这个里面其实就是定一个了一个filewatcher的接口

type filewatcher interface {
    blockuntilexists(*tomb.tomb) error

    changeevents(*tomb.tomb, int64) (*filechanges, error)
}

接着我们看一下inotify.go文件,这个里面我们就可以看到定一个inotifyfilewatcher结构体,并且实现了filewatcher 这个接口

type inotifyfilewatcher struct {
    filename     string
    size        int64
}

func newinotifyfilewatcher(filename string) *inotifyfilewatcher {
    fw := &inotifyfilewatcher {
        filepath.clean(filename),
        0,
    }
    return fw
}


// 关于文件改变事件的处理,当文件被修改了或者文件内容被追加了,进行通知
func (fw *inotifyfilewatcher) changeevents(t *tomb.tomb, pos int64) (*filechanges, error) {
    err := watch(fw.filename)
    if err != nil {
        return nil, err
    }

    changes := newfilechanges()
    fw.size = pos

    go func() {
        events := events(fw.filename)

        for {
            prevsize := fw.size

            var evt fsnotify.event
            var ok bool
            
            select {
            case evt, ok = <- events:
                if !ok {
                    removewatch(fw.filename)
                    return
                }
            case <- t.dying():
                removewatch(fw.filename)
                return
            }
            switch {
            case evt.op & fsnotify.remove == fsnotify.remove:
                fallthrough
            case evt.op & fsnotify.rename == fsnotify.rename:
                removewatch(fw.filename)
                changes.notifydeleted()
                return

            case evt.op & fsnotify.chmod == fsnotify.chmod:
                fallthrough
            case evt.op & fsnotify.write == fsnotify.write:
                fi, err := os.stat(fw.filename)
                if err != nil {
                    // 文件如果被删除了通知文件删除到chan
                    if os.isnotexist(err) {
                        removewatch(fw.filename)
                        changes.notifydeleted()
                        return
                    }

                }
                fw.size = fi.size()

                if prevsize > 0 && prevsize > fw.size {
                    // 表示文件内容增加了
                    changes.notifytruncated()
                } else {
                    // 表示文件被修改了
                    changes.notifymodified()
                }

                prevsize = fw.size
            }

        }
    }()
    return changes, nil
}

func (fw *inotifyfilewatcher) blockuntilexists(t *tomb.tomb) error {
    err := watchcreate(fw.filename)
    if err != nil {
        return err
    }
    defer removewatchcreate(fw.filename)
    if _, err := os.stat(fw.filename);!os.isnotexist(err) {
        return err
    }
    events := events(fw.filename)
    for {
        select {
        case evt, ok := <- events:
            if !ok {
                return fmt.errorf("inotify watcher has been closed")
            }
            evtname, err := filepath.abs(evt.name)
            if err != nil {
                return err
            }
            fwfilename, err := filepath.abs(fw.filename)
            if err != nil {
                return err
            }
            if evtname == fwfilename {
                return nil
            }
        case <- t.dying():
            return tomb.errdying
        }
    }
    panic("unreachable")
}

实现的接口就两个方法:

changeevents: 这个主要是监控文件的变化,是删除了,还是被修改了,或者是文件,然后将状态信息通过调用:changes.notifytruncated()或者
changes.notifydeleted() 或者changes.notifymodified() 将状态信息更新到channel中,这样我们在分析tail.go 中最后的分析的那部分channel中的数据,就是在这里
放进去的
blockuntilexists:这个主要是关于文件不存在的时候,如果最开始的时候可以允许文件不存在,那么就会 在这里通过for循环一直等待,知道文件存在
 
再看看filechanges.go 文件,代码内容如下:
type filechanges struct {
    modified  chan bool        // 修改
    truncated chan bool        // 增加
    deleted   chan bool        // 删除
}

func newfilechanges() *filechanges {
    return &filechanges{
        make(chan bool, 1),
        make(chan bool, 1),
        make(chan bool, 1),
    }
}

func (fc *filechanges) notifymodified() {
    sendonlyifempty(fc.modified)
}

func (fc *filechanges) notifytruncated() {
    sendonlyifempty(fc.truncated)
}

func (fc *filechanges) notifydeleted() {
    sendonlyifempty(fc.deleted)
}

func sendonlyifempty(ch chan bool) {
    select {
    case ch <- true:
    default:
    }
}

在这个里面也是可以学习到人家写的这个地方非常巧妙,虽然谈不上代码高达上,但是看着会让你很舒服,通过这个结构体,当文件被删除,修改和增加的时候就会让对应的channel中插入一个true,并且这里

的channel都是不带缓冲区的,只有当tail中触发一次之后,channel中的内容就会被获取出来,从而触发tail继续读文件的内容