go 源码学习之---Tail 源码分析
已经有两个月没有写博客了,也有好几个月没有看go相关的内容了,由于工作原因最近在做java以及大数据相关的内容,导致最近工作较忙,博客停止了更新,正好想捡起之前go的东西,所以找了一个源码学习
这个也是之前用go写日志收集的时候用到的一个包 :github.com/hpcloud/tail, 这次就学习一下人家的源码,为了方便看这个代码,我将这个包进行了简化,也是用于方便理解,代码放到了:https://github.com/pythonsite/tail, 这个代码包可能无法正常用,只是为了方面理解tail这个包,以及学习人家的代码
精简后的代码目录
│ tail.go │ └─watch filechanges.go inotify.go inotify_tracker.go watch.go
tail.go: 这里包含着tail包的核心代码,主要的逻辑处理时在这个里面
watch: 这个包主要用于对文件的监控,用于将文件的变化通知到tail.如:文件修改了,文件删除了,文件内容追加了
tail.go 代码分析
在tail.go中主要有几下几个结构体:
// line 结构体用于存读每行的时候的对象 type line struct { text string //当前行的内容 time time.time // 时间 err error // error from tail } type seekinfo struct { offset int64 whence int } // 关于配置的结构体 type config struct { location *seekinfo reopen bool mustexist bool // 要打开的文件是否必须存在 poll bool pipe bool follow bool // 是否继续读取新的一行,可以理解为tail -f 命令 } // 核心的结构体tail type tail struct { filename string // 要打开的文件名 lines chan *line // 用于存每行内容的line结构体 config watcher watch.filewatcher changes *watch.filechanges tomb.tomb file *os.file reader *bufio.reader lk sync.mutex }
tail,err := tail.tailfile(conf.logpath,tail.config{ reopen:true, follow:true, location:&tail.seekinfo{offset:0,whence:2}, mustexist:false, poll:true, })
既然我们使用的时候就会在最开始的时候调用tail.tailfile方法,就直接看这个方法:
// 主要用于tail结构体的初始化 func tailfile(filename string, config config) (*tail, error) { t := &tail { filename: filename, lines: make(chan *line), config: config, } t.watcher = watch.newinotifyfilewatcher(filename) if t.mustexist { var err error t.file, err = openfile(t.filename) if err != nil { return nil, err } } go t.tailfilesync() return t, nil }
从这个代码里我们就可以看到它首先初始化了tail结构体并且对tail中的watcher进行的复制,先暂时不看watch相关的内容
然后就是关于文件是否必须存在的判断处理,最后开启了一个一个线程执行tailfilesync()方法,我们接着看tailfilesync方法
func (tail *tail) tailfilesync(){ defer tail.done() defer tail.close() if !tail.mustexist { err := tail.reopen() if err != nil { if err != tomb.errdying { tail.kill(err) } return } } tail.openreader() var offset int64 var err error // 一行行读文件内容 for { if !tail.pipe { offset,err = tail.tell() if err != nil { tail.kill(err) return } } line, err := tail.readline() if err == nil { // 将读取的一行内容放到chan中 tail.sendline(line) } else if err == io.eof { // 表示读到文件的最后了 // 如果follow 设置为false的话就不会继续读文件 if !tail.follow { if line != "" { tail.sendline(line) } return } // 如果follow设置为true则会继续读 if tail.follow && line != "" { err := tail.seekto(seekinfo{offset: offset, whence: 0}) if err != nil { tail.kill(err) return } } // 如果读到文件最后,文件并没有新的内容增加 err := tail.waitforchanges() if err != nil { if err != errstop { tail.kill(err) } return } } else { // 既不是文件结尾,也没有error tail.killf("error reading %s :%s", tail.filename, err) return } select { case <- tail.dying(): if tail.err() == errstopateof { continue } return default: } } }
这个方法里主要是先调用了openreader方法,这个方法其实并没有做什么,只是对tail.reqader进行了赋值:tail.reader = bufio.newreader(tail.file)
接着就是循环一行行的读文件
在循环里最开始判断了tail.pipe的值,这个值一般开始我也并不会设置,所以默认就是false,所以就会执行tail.tell()方法,这个方法主要是用于获取文件当前行的位置信息,下面是tell的代码内容:
// 获取文件当前行的位置信息 func (tail *tail) tell()(offset int64, err error) { if tail.file == nil { return } offset, err = tail.file.seek(0, os.seek_cur) if err != nil { return } tail.lk.lock() defer tail.lk.unlock() if tail.reader == nil { return } offset -= int64(tail.reader.buffered()) return }
接着会调用tail.readline()方法,这个方法就是用于获取文件的一行内容,同时将一行内容实例化为line对象,然后扔到管道tail.lines中
//将读取的文件的每行内容存入到line结构体中,并最终存入到tail.lines的chan中 func (tail *tail) sendline(line string) bool { now := time.now() lines := []string{line} for _, line := range lines { tail.lines <- &line { line, now, nil, } } return true }
最后的大量if 判断其实主要是针对读到文件末尾后的一些操作,
tail结构体在最后定义的时候有一个参数:follow, 这个参数的目的就是当读到文件最后的时候是否继续读文件, 如果最开始设置了false,那么读到最后之后就不会在读文件了
如果设置为true,那么读到文件最后之后会保存文件的位置信息,并执行waitforchanges() 去等待文件的变化,waitforchanges()代码内容如下:
// 等待文件的变化事件 func (tail *tail) waitforchanges() error { if tail.changes == nil { // 这里是获取文件指针的当前位置 pos, err := tail.file.seek(0,os.seek_cur) if err != nil { return err } tail.changes, err = tail.watcher.changeevents(&tail.tomb, pos) if err != nil { return err } } // 和inotify中进行很巧妙的配合,这里通过select 来进行查看那个chan变化了,来知道文件的变化 select { case <- tail.changes.modified: // 文件被修改 return nil case <- tail.changes.deleted: // 文件被删除或者移动到其他目录 tail.changes = nil // 如果文件被删除或者被移动到其他目录,则会尝试重新打开文件 if tail.reopen { fmt.printf("re-opening moved/deleted file %s...",tail.filename) if err := tail.reopen();err != nil { return err } fmt.printf("successfully reopened %s", tail.filename) tail.openreader() return nil } else { fmt.printf("stoping tail as file not longer exists: %s", tail.filename) return errstop } case <- tail.changes.truncated: // 文件被追加新的内容 fmt.printf("re-opening truncated file %s....", tail.filename) if err := tail.reopen();err != nil { return err } fmt.printf("successfuly reopend truncated %s", tail.filename) tail.openreader() return nil case <- tail.dying(): return nil } panic("unreachable") }
看到这里的时候其实就能感觉到,别人写的代码其实也并不是非常复杂,也是很普通的代码,但是你会觉得人家很多地方用的非常巧妙,
这段代码中主要的是的内容就是select部分,这个部分通过select监控
从而知道文件的变化,是修改了,还是删除了,还是追加内容了,这几个其实都是一个channel,这几个channel中的内容是怎么放进去的呢,接下来看watch包中的内容
watch包代码分析
首先先看一下watch包中的watch.go,这个里面其实就是定一个了一个filewatcher的接口
type filewatcher interface { blockuntilexists(*tomb.tomb) error changeevents(*tomb.tomb, int64) (*filechanges, error) }
接着我们看一下inotify.go文件,这个里面我们就可以看到定一个inotifyfilewatcher结构体,并且实现了filewatcher 这个接口
type inotifyfilewatcher struct { filename string size int64 } func newinotifyfilewatcher(filename string) *inotifyfilewatcher { fw := &inotifyfilewatcher { filepath.clean(filename), 0, } return fw } // 关于文件改变事件的处理,当文件被修改了或者文件内容被追加了,进行通知 func (fw *inotifyfilewatcher) changeevents(t *tomb.tomb, pos int64) (*filechanges, error) { err := watch(fw.filename) if err != nil { return nil, err } changes := newfilechanges() fw.size = pos go func() { events := events(fw.filename) for { prevsize := fw.size var evt fsnotify.event var ok bool select { case evt, ok = <- events: if !ok { removewatch(fw.filename) return } case <- t.dying(): removewatch(fw.filename) return } switch { case evt.op & fsnotify.remove == fsnotify.remove: fallthrough case evt.op & fsnotify.rename == fsnotify.rename: removewatch(fw.filename) changes.notifydeleted() return case evt.op & fsnotify.chmod == fsnotify.chmod: fallthrough case evt.op & fsnotify.write == fsnotify.write: fi, err := os.stat(fw.filename) if err != nil { // 文件如果被删除了通知文件删除到chan if os.isnotexist(err) { removewatch(fw.filename) changes.notifydeleted() return } } fw.size = fi.size() if prevsize > 0 && prevsize > fw.size { // 表示文件内容增加了 changes.notifytruncated() } else { // 表示文件被修改了 changes.notifymodified() } prevsize = fw.size } } }() return changes, nil } func (fw *inotifyfilewatcher) blockuntilexists(t *tomb.tomb) error { err := watchcreate(fw.filename) if err != nil { return err } defer removewatchcreate(fw.filename) if _, err := os.stat(fw.filename);!os.isnotexist(err) { return err } events := events(fw.filename) for { select { case evt, ok := <- events: if !ok { return fmt.errorf("inotify watcher has been closed") } evtname, err := filepath.abs(evt.name) if err != nil { return err } fwfilename, err := filepath.abs(fw.filename) if err != nil { return err } if evtname == fwfilename { return nil } case <- t.dying(): return tomb.errdying } } panic("unreachable") }
实现的接口就两个方法:
type filechanges struct { modified chan bool // 修改 truncated chan bool // 增加 deleted chan bool // 删除 } func newfilechanges() *filechanges { return &filechanges{ make(chan bool, 1), make(chan bool, 1), make(chan bool, 1), } } func (fc *filechanges) notifymodified() { sendonlyifempty(fc.modified) } func (fc *filechanges) notifytruncated() { sendonlyifempty(fc.truncated) } func (fc *filechanges) notifydeleted() { sendonlyifempty(fc.deleted) } func sendonlyifempty(ch chan bool) { select { case ch <- true: default: } }
在这个里面也是可以学习到人家写的这个地方非常巧妙,虽然谈不上代码高达上,但是看着会让你很舒服,通过这个结构体,当文件被删除,修改和增加的时候就会让对应的channel中插入一个true,并且这里
的channel都是不带缓冲区的,只有当tail中触发一次之后,channel中的内容就会被获取出来,从而触发tail继续读文件的内容
上一篇: 3、css3-动画(animation)
下一篇: 第一小节总结:
推荐阅读
-
Java并发编程学习之Unsafe类与LockSupport类源码详析
-
Java并发编程学习之ThreadLocal源码详析
-
JDK源码分析之String、StringBuilder和StringBuffer
-
Java并发编程学习之Unsafe类与LockSupport类源码详析
-
Java并发编程学习之ThreadLocal源码详析
-
Vue源码学习之关于对Array的数据侦听实现
-
Android源码学习之组合模式定义及应用
-
Android源码学习之单例模式应用及优点介绍
-
JDK源码分析之String、StringBuilder和StringBuffer
-
Android源码学习之工厂方法模式应用及优势介绍