logrus hook输出日志到本地磁盘的操作
logrus是go的一个日志框架,它最让人激动的应该是hook机制,可以在初始化时为logrus添加hook,logrus可以实现各种扩展功能,可以将日志输出到elasticsearch和activemq等中间件去,甚至可以输出到你的email和叮叮中去,不要问为为什么可以发现可以输入到叮叮中去,都是泪,手动笑哭!
言归正传,这里就简单的通过hook机制将文件输出到本地磁盘。
首先
go get github.com/sirupsen/logrus
然后
logrus和go lib里面一样有6个等级,可以直接调用
logrus.debug("useful debugging information.") logrus.info("something noteworthy happened!") logrus.warn("you should probably take a look at this.") logrus.error("something failed but i'm not quitting.") logrus.fatal("bye.") //log之后会调用os.exit(1) logrus.panic("i'm bailing.") //log之后会panic()
项目例子结构
main.go
package main import ( "fmt" "github.com/sirupsen/logrus" "logt/logs" ) func main() { //创建一个hook,将日志存储路径输入进去 hook := logs.newhook("d:/log/golog.log") //加载hook之前打印日志 logrus.withfield("file", "d:/log/golog.log").info("new logrus hook err.") logrus.addhook(hook) //加载hook之后打印日志 logrus.withfields(logrus.fields{ "animal": "walrus", }).info("a walrus appears") }
hook.go
不要看下面三个go文件代码很长,其实大多数都是固定代码,也就newhook函数自己扩展定义就好
package logs
import ( "fmt" "github.com/sirupsen/logrus" "os" "strings" ) // hook 写文件的logrus hook type hook struct { w loggerinterface } func newhook(file string) (f *hook) { w := newfilewriter() config := fmt.sprintf(`{"filename":"%s","maxdays":7}`, file) err := w.init(config) if err != nil { return nil } return &hook{w} } // fire 实现hook的fire接口 func (hook *hook) fire(entry *logrus.entry) (err error) { message, err := getmessage(entry) if err != nil { fmt.fprintf(os.stderr, "unable to read entry, %v", err) return err } switch entry.level { case logrus.paniclevel: fallthrough case logrus.fatallevel: fallthrough case logrus.errorlevel: return hook.w.writemsg(fmt.sprintf("[error] %s", message), levelerror) case logrus.warnlevel: return hook.w.writemsg(fmt.sprintf("[warn] %s", message), levelwarn) case logrus.infolevel: return hook.w.writemsg(fmt.sprintf("[info] %s", message), levelinfo) case logrus.debuglevel: return hook.w.writemsg(fmt.sprintf("[debug] %s", message), leveldebug) default: return nil } } // levels 实现hook的levels接口 func (hook *hook) levels() []logrus.level { return []logrus.level{ logrus.paniclevel, logrus.fatallevel, logrus.errorlevel, logrus.warnlevel, logrus.infolevel, logrus.debuglevel, } } func getmessage(entry *logrus.entry) (message string, err error) { message = message + fmt.sprintf("%s ", entry.message) file, linenumber := getcallerignoringlogmulti(2) if file != "" { sep := fmt.sprintf("%s/src/", os.getenv("gopath")) filename := strings.split(file, sep) if len(filename) >= 2 { file = filename[1] } } message = fmt.sprintf("%s:%d ", file, linenumber) + message for k, v := range entry.data { message = message + fmt.sprintf("%v:%v ", k, v) } return }
caller.go
package logs import ( "runtime" "strings" ) func getcaller(calldepth int, suffixestoignore ...string) (file string, line int) { // bump by 1 to ignore the getcaller (this) stackframe calldepth++ outer: for { var ok bool _, file, line, ok = runtime.caller(calldepth) if !ok { file = "???" line = 0 break } for _, s := range suffixestoignore { if strings.hassuffix(file, s) { calldepth++ continue outer } } break } return } // getcallerignoringlogmulti todo func getcallerignoringlogmulti(calldepth int) (string, int) { // the +1 is to ignore this (getcallerignoringlogmulti) frame return getcaller(calldepth+1, "logrus/hooks.go", "logrus/entry.go", "logrus/logger.go", "logrus/exported.go", "asm_amd64.s") }
file.go
package logs import ( "encoding/json" "errors" "fmt" "io/ioutil" "log" "os" "path/filepath" "strings" "sync" "time" ) // rfc5424 log message levels. const ( levelerror = iota levelwarn levelinfo leveldebug ) // loggerinterface logger接口 type loggerinterface interface { init(config string) error writemsg(msg string, level int) error destroy() flush() } // logwriter implements loggerinterface. // it writes messages by lines limit, file size limit, or time frequency. type logwriter struct { *log.logger mw *muxwriter // the opened file filename string `json:"filename"` maxlines int `json:"maxlines"` maxlinescurlines int // rotate at size maxsize int `json:"maxsize"` maxsizecursize int // rotate daily daily bool `json:"daily"` maxdays int64 `json:"maxdays"` dailyopendate int rotate bool `json:"rotate"` startlock sync.mutex // only one log can write to the file level int `json:"level"` } // muxwriter an *os.file writer with locker. type muxwriter struct { sync.mutex fd *os.file } // write to os.file. func (l *muxwriter) write(b []byte) (int, error) { l.lock() defer l.unlock() return l.fd.write(b) } // setfd set os.file in writer. func (l *muxwriter) setfd(fd *os.file) { if l.fd != nil { _ = l.fd.close() } l.fd = fd } // newfilewriter create a filelogwriter returning as loggerinterface. func newfilewriter() loggerinterface { w := &logwriter{ filename: "", maxlines: 1000000, maxsize: 1 << 28, //256 mb daily: true, maxdays: 7, rotate: true, level: leveldebug, } // use muxwriter instead direct use os.file for lock write when rotate w.mw = new(muxwriter) // set muxwriter as logger's io.writer w.logger = log.new(w.mw, "", log.ldate|log.ltime) return w } // init file logger with json config. // jsonconfig like: // { // "filename":"logs/sample.log", // "maxlines":10000, // "maxsize":1<<30, // "daily":true, // "maxdays":15, // "rotate":true // } func (w *logwriter) init(jsonconfig string) error { err := json.unmarshal([]byte(jsonconfig), w) if err != nil { return err } if len(w.filename) == 0 { return errors.new("jsonconfig must have filename") } err = w.startlogger() return err } // start file logger. create log file and set to locker-inside file writer. func (w *logwriter) startlogger() error { fd, err := w.createlogfile() if err != nil { return err } w.mw.setfd(fd) err = w.initfd() if err != nil { return err } return nil } func (w *logwriter) docheck(size int) { w.startlock.lock() defer w.startlock.unlock() if w.rotate && ((w.maxlines > 0 && w.maxlinescurlines >= w.maxlines) || (w.maxsize > 0 && w.maxsizecursize >= w.maxsize) || (w.daily && time.now().day() != w.dailyopendate)) { if err := w.dorotate(); err != nil { fmt.fprintf(os.stderr, "filelogwriter(%q): %s\n", w.filename, err) return } } w.maxlinescurlines++ w.maxsizecursize += size } // writemsg write logger message into file. func (w *logwriter) writemsg(msg string, level int) error { if level > w.level { return nil } n := 24 + len(msg) // 24 stand for the length "2013/06/23 21:00:22 [t] " w.docheck(n) w.logger.print(msg) return nil } func (w *logwriter) createlogfile() (*os.file, error) { // open the log file fd, err := os.openfile(w.filename, os.o_wronly|os.o_append|os.o_create, 0660) return fd, err } func (w *logwriter) initfd() error { fd := w.mw.fd finfo, err := fd.stat() if err != nil { return fmt.errorf("get stat err: %s", err) } w.maxsizecursize = int(finfo.size()) w.dailyopendate = time.now().day() if finfo.size() > 0 { content, err := ioutil.readfile(w.filename) if err != nil { return err } w.maxlinescurlines = len(strings.split(string(content), "\n")) } else { w.maxlinescurlines = 0 } return nil } // dorotate means it need to write file in new file. // new file name like xx.log.2013-01-01.2 func (w *logwriter) dorotate() error { _, err := os.lstat(w.filename) if err == nil { // file exists // find the next available number num := 1 fname := "" for ; err == nil && num <= 999; num++ { fname = w.filename + fmt.sprintf(".%s.%03d", time.now().format("2006-01-02"), num) _, err = os.lstat(fname) } // return error if the last file checked still existed if err == nil { return fmt.errorf("rotate: cannot find free log number to rename %s", w.filename) } // block logger's io.writer w.mw.lock() defer w.mw.unlock() fd := w.mw.fd _ = fd.close() // close fd before rename // rename the file to its newfound home err = os.rename(w.filename, fname) if err != nil { return fmt.errorf("rotate: %s", err) } // re-start logger err = w.startlogger() if err != nil { return fmt.errorf("rotate startlogger: %s", err) } go w.deleteoldlog() } return nil } func (w *logwriter) deleteoldlog() { dir := filepath.dir(w.filename) _ = filepath.walk(dir, func(path string, info os.fileinfo, err error) (returnerr error) { defer func() { if r := recover(); r != nil { returnerr = fmt.errorf("unable to delete old log '%s', error: %+v", path, r) fmt.println(returnerr) } }() if !info.isdir() && info.modtime().unix() < (time.now().unix()-60*60*24*w.maxdays) { if strings.hasprefix(filepath.base(path), filepath.base(w.filename)) { _ = os.remove(path) } } return }) } // destroy destroy file logger, close file writer. func (w *logwriter) destroy() { _ = w.mw.fd.close() } // flush file logger. // there are no buffering messages in file logger in memory. // flush file means sync file from disk. func (w *logwriter) flush() { _ = w.mw.fd.sync() }
补充知识:golang logrus自定义hook:日志切片hook、邮件警报hook、kafkahook
logrus hook 分析
logrus hook 接口定义很简单。如下
package logrus // a hook to be fired when logging on the logging levels returned from // `levels()` on your implementation of the interface. note that this is not // fired in a goroutine or a channel with workers, you should handle such // functionality yourself if your call is non-blocking and you don't wish for // the logging calls for levels returned from `levels()` to block. type hook interface { levels() []level fire(*entry) error } // internal type for storing the hooks on a logger instance. type levelhooks map[level][]hook // add a hook to an instance of logger. this is called with // `log.hooks.add(new(myhook))` where `myhook` implements the `hook` interface. func (hooks levelhooks) add(hook hook) { for _, level := range hook.levels() { hooks[level] = append(hooks[level], hook) } } // fire all the hooks for the passed level. used by `entry.log` to fire // appropriate hooks for a log entry. func (hooks levelhooks) fire(level level, entry *entry) error { for _, hook := range hooks[level] { if err := hook.fire(entry); err != nil { return err } } return nil }
只需实现 该结构的接口。
type hook interface { levels() []level fire(*entry) error }
就会被logrus框架遍历调用已注册的 hook 的 fire 方法
获取日志实例
// log_hook.go package logger import ( "fmt" "github.com/sirupsen/logrus" "library/util/constant" "os" ) //自实现 logrus hook func getlogger(module string) *logrus.logger { //实例化 logger := logrus.new() //设置输出 logger.out = os.stdout //设置日志级别 logger.setlevel(logrus.debuglevel) //设置日志格式 //自定writer就行, hook 交给 lfshook logger.addhook(newlogrushook(constant.getlogpath(), module)) logger.setformatter(&logrus.jsonformatter{ timestampformat:"2006-01-02 15:04:05", }) return logger } //确保每次调用使用的文件都是唯一的。 func getnewfieldloggercontext(module,appfield string) *logrus.entry { logger:= getlogger(module) return logger.withfields(logrus.fields{ "app": appfield, }) } //订阅 警告日志 func subscribelog(entry *logrus.entry, submap subscribemap) { logger := entry.logger logger.addhook(newsubscribehook(submap)) fmt.println("日志订阅成功") }
constant.getlogpath() 可以替换为自己的日志文件输出目录地址,比如我的mac上则是:/usr/local/log ,直接替换即可。
日志切片hook
代码
// writer.go package logger import ( "fmt" "github.com/pkg/errors" "io" "library/util" "os" "path/filepath" "sync" "time" ) type logwriter struct { logdir string //日志根目录地址。 module string //模块 名 curfilename string //当前被指定的filename curbasefilename string //在使用中的file turncateduration time.duration mutex sync.rwmutex outfh *os.file } func (w *logwriter) write(p []byte) (n int, err error) { w.mutex.lock() defer w.mutex.unlock() if out, err:= w.getwriter(); err!=nil { return 0, errors.new("failed to fetch target io.writer") }else{ return out.write(p) } } func (w *logwriter) getfilename() string { base := time.now().truncate(w.turncateduration) return fmt.sprintf("%s/%s/%s_%s", w.logdir, base.format("2006-01-02"), w.module, base.format("15")) } func (w *logwriter) getwriter()(io.writer, error) { filename := w.curbasefilename //判断是否有新的文件名 //会出现新的文件名 basefilename := w.getfilename() if basefilename != filename { filename = basefilename } dirname := filepath.dir(filename) if err := os.mkdirall(dirname, 0755); err != nil { return nil, errors.wrapf(err, "failed to create directory %s", dirname) } filehandler, err := os.openfile(filename, os.o_create|os.o_append|os.o_wronly, 0644) if err != nil { return nil, errors.errorf("failed to open file %s", err) } w.outfh.close() w.outfh = filehandler w.curbasefilename = filename w.curfilename = filename return filehandler, nil } func new(logpath, module string, duration time.duration) *logwriter { return &logwriter{ logdir: logpath, module: module, turncateduration:duration, curfilename: "", curbasefilename: "", } }
// hook.go package logger import ( "github.com/rifflock/lfshook" "github.com/sirupsen/logrus" "time" ) func newlogrushook(logpath, moduel string) logrus.hook { logrus.setlevel(logrus.warnlevel) writer := new(logpath, moduel, time.hour * 2) lfshook := lfshook.newhook(lfshook.writermap{ logrus.debuglevel: writer, logrus.infolevel: writer, logrus.warnlevel: writer, logrus.errorlevel: writer, logrus.fatallevel: writer, logrus.paniclevel: writer, }, &logrus.textformatter{disablecolors: true}) // writer 生成新的log文件类型 writer 在通过new hook函数 消费 fire 函数 // writer 是实现了writer 接口的库,在日志调用write是做预处理 return lfshook }
测试代码
func testgetlogger(t *testing.t) { lg := getnewfieldloggercontext("test","d") lg.logger.info("????") }
解析
logger实例持有了 自定义的 io.writer 结构体,在消费fire函数时,会调用write方法,此时通过truncate时间切片函数逻辑判断需要写入的文件。或创建新的文件。
注: 文章提供的代码是按天切分文件夹的,文件夹内模块日志再按2小时切分。可自行替换成按模块切分。
邮件警报hook
代码
// subscribehook.go package logger import ( "fmt" "github.com/sirupsen/logrus" "library/email" "strings" ) type subscribemap map[logrus.level][]*email.receiver type subscribehook struct { submap subscribemap } //此处可以自实现hook 目前使用三方hook func(h *subscribehook)levels() []logrus.level{ return logrus.alllevels } func(h *subscribehook)fire(entry *logrus.entry) error{ for level, receivers := range h.submap { //命中 准备消费 if level == entry.level { if len(receivers) > 0 { email.sendemail(receivers, fmt.sprintf("%s:[系统日志警报]", entry.level.string()), fmt.sprintf("错误内容: %s",entry.message)) } } } return nil } func newsubscribemap(level logrus.level, receiverstr string) subscribemap{ submap := subscribemap{} addresslist := strings.split(receiverstr,";") var receivers []*email.receiver for _, address := range addresslist { receivers = append(receivers, &email.receiver{email: address}) } submap[level] = receivers return submap } func newsubscribehook(submap subscribemap) *subscribehook { return &subscribehook{submap}
// email.go package email import ( "fmt" "gopkg.in/gomail.v2" "regexp" "strconv" ) type sender struct { user string password string host string port int mailto []string subject string content string } type receiver struct { email string } func (r *receiver) check() bool { pattern := `\w+([-+.]\w+)*@\w+([-.]\w+)*\.\w+([-.]\w+)*` //匹配电子邮箱 reg := regexp.mustcompile(pattern) return reg.matchstring(r.email) } func (s *sender) clean (){ } //检查 邮箱正确性 func (s *sender)newreceiver(email string) *receiver { rec := &receiver{email:email} if rec.check() { m.mailto = []string{email} return rec }else{ fmt.printf("email check fail 【%s】\n", email) return nil } } func (s *sender)newreceivers(receivers []*receiver) { for _, rec := range receivers { if rec.check() { m.mailto = append(m.mailto, rec.email) }else{ fmt.printf("email check fail 【%s】\n", rec.email) } } } // 163邮箱 password 为开启smtp后给的秘钥 var m = sender{user:"6666666@163.com", password:"666666666", host: "smtp.163.com", port: 465} func sendemail(receivers []*receiver,subject, content string){ m.newreceivers(receivers) m.subject = subject m.content = content e := gomail.newmessage() e.setheader("from", e.formataddress(m.user, "hengsheng")) e.setheader("to", m.mailto...) //发送给多个用户 e.setheader("subject", m.subject) //设置邮件主题 e.setbody("text/html", m.content) //设置邮件正文 d := gomail.newdialer(m.host, m.port, m.user, m.password) err := d.dialandsend(e) if err != nil { fmt.printf("error 邮件发送错误! %s \n", err.error()) } }
使用
同理在writer时 如果是错误日志则发送邮件。
o.logger = logger.getnewfieldloggercontext("test", "666") if subscribesocket { logger.subscribelog(o.logger, logger.newsubscribemap(logrus.errorlevel, "a@163.com;b@163.com")) } // o 为实际结构体实例
kafkahook
// kafka hook package logger import ( "github.com/sirupsen/logrus" "library/kafka" "library/util/constant" ) type kafkahook struct { kafkaproducer *kafka.kafkaproducer } func(h *kafkahook)levels() []logrus.level{ return logrus.alllevels } func(h *kafkahook)fire(entry *logrus.entry) error{ h.kafkaproducer.sendmsgsync(entry.message) return nil } func newkafkahook() *kafkahook{ producer := kafka.newkafkaproducer(constant.kafkalogelktopic,true) return &kafkahook{kafkaproducer: producer} }
使用时logger.addhook(newkafkahook()) 即可
kafka模块
生产者
// kafkaproducer.go package kafka import ( "errors" "fmt" "github.com/shopify/sarama" "library/util/constant" "log" "time" ) func getkafkaaddress()[]string{ return "127.0.0.1:9092" } //同步消息模式 func syncproducer(topic, message string) error { config := sarama.newconfig() config.producer.return.successes = true config.producer.timeout = 5 * time.second p, err := sarama.newsyncproducer(getkafkaaddress(), config) if err != nil { return errors.new(fmt.sprintf("sarama.newsyncproducer err, message=%s \n", err)) } defer p.close() msg := &sarama.producermessage{ topic: topic, value: sarama.byteencoder(message), } part, offset, err := p.sendmessage(msg) if err != nil { return errors.new(fmt.sprintf("send sdsds err=%s \n", err)) } else { fmt.printf("发送成功,partition=%d, offset=%d \n", part, offset) return nil } } //async 异步生产者 type kafkaproducer struct { topic string asyncproducer *sarama.asyncproducer syncproducer *sarama.syncproducer sync bool } func newkafkaproducer(topic string, sync bool) *kafkaproducer { k := &kafkaproducer{ topic: topic, sync: sync, } if sync { k.initsync() }else{ k.initasync() } return k } func (k *kafkaproducer) initasync() bool { if k.sync { fmt.printf("sync producer cant call async func !\n") return false } config := sarama.newconfig() //等待服务器所有副本都保存成功后的响应 config.producer.requiredacks = sarama.waitforall //随机向partition发送消息 config.producer.partitioner = sarama.newrandompartitioner //是否等待成功和失败后的响应,只有上面的requireacks设置不是noreponse这里才有用. config.producer.return.successes = true config.producer.return.errors = true //设置使用的kafka版本,如果低于v0_10_0_0版本,消息中的timestrap没有作用.需要消费和生产同时配置 //注意,版本设置不对的话,kafka会返回很奇怪的错误,并且无法成功发送消息 config.version = sarama.v0_10_0_1 producer, e := sarama.newasyncproducer(getkafkaaddress(), config) if e != nil { fmt.println(e) return false } k.asyncproducer = &producer defer producer.asyncclose() pd := *k.asyncproducer go func() { for{ select { case <-pd.successes(): //fmt.println("offset: ", suc.offset, "timestamp: ", suc.timestamp.string(), "partitions: ", suc.partition) case fail := <-pd.errors(): fmt.printf("err: %s \n", fail.err.error()) } } }() return true } func (k *kafkaproducer) initsync() bool { if !k.sync { fmt.println("async producer cant call sync func !") return false } config := sarama.newconfig() config.producer.return.successes = true config.producer.timeout = 5 * time.second p, err := sarama.newsyncproducer(getkafkaaddress(), config) k.syncproducer = &p if err != nil { log.printf("sarama.newsyncproducer err, message=%s \n", err) return false } return true } func (k *kafkaproducer) sendmsgasync(sendstr string) { msg := &sarama.producermessage{ topic: k.topic, } //将字符串转化为字节数组 msg.value = sarama.byteencoder(sendstr) //fmt.println(value) //使用通道发送 pd := *k.asyncproducer pd.input() <- msg } func (k *kafkaproducer) sendmsgsync(sendstr string) bool { msg := &sarama.producermessage{ topic: k.topic, value: sarama.byteencoder(sendstr), } pd := *k.syncproducer part, offset, err := pd.sendmessage(msg) if err != nil { fmt.printf("发送失败 send message(%s) err=%s \n", sendstr, err) return false } else { fmt.printf("发送成功 partition=%d, offset=%d \n", part, offset) return true } }
调用 sendmsgsync 或 sendmsgasync 生产消息,注意初始化时的参数要保证一致!
消费者组
// kafkaconsumergroup.go package kafka import ( "context" "fmt" "github.com/shopify/sarama" "log" "sync" ) func newkafkaconsumergroup(topics []string, group string, businesscall func(message *sarama.consumermessage) bool) *kafkaconsumergroup { k := &kafkaconsumergroup{ brokers: getkafkaaddress(), topics: topics, group: group, channelbuffersize: 2, ready: make(chan bool), version: "1.1.1", handler: businesscall, } k.init() return k } // 消费者组(consumer group): 相同的group.id的消费者将视为同一个消费者组, // 每个消费者都需要设置一个组id, 每条消息只能被 consumer group 中的一个 // consumer 消费,但可以被多个 consumer group 消费 type kafkaconsumergroup struct { //代理(broker): 一台kafka服务器称之为一个broker brokers []string //主题(topic): 消息的一种逻辑分组,用于对消息分门别类,每一类消息称之为一个主题,相同主题的消息放在一个队列中 topics []string version string ready chan bool group string channelbuffersize int //业务调用 handler func(message *sarama.consumermessage) bool } func (k *kafkaconsumergroup)init() func() { version,err := sarama.parsekafkaversion(k.version) if err!=nil{ fmt.printf("error parsing kafka version: %v", err) } cfg := sarama.newconfig() cfg.version = version // 分区分配策略 cfg.consumer.group.rebalance.strategy = sarama.balancestrategyrange // 未找到组消费位移的时候从哪边开始消费 cfg.consumer.offsets.initial = -2 // channel长度 cfg.channelbuffersize = k.channelbuffersize ctx, cancel := context.withcancel(context.background()) client, err := sarama.newconsumergroup(k.brokers, k.group, cfg) if err != nil { fmt.printf("error creating consumer group client: %v", err) } wg := &sync.waitgroup{} wg.add(1) go func() { defer func() { wg.done() //util.handlepanic("client.consume panic", log.standardlogger()) }() for { if err := client.consume(ctx, k.topics, k); err != nil { log.printf("error from consumer: %v", err) } // check if context was cancelled, signaling that the consumer should stop if ctx.err() != nil { log.println(ctx.err()) return } k.ready = make(chan bool) } }() <-k.ready fmt.printf("sarama consumer up and running!... \n") // 保证在系统退出时,通道里面的消息被消费 return func() { cancel() wg.wait() if err = client.close(); err != nil { fmt.printf("error closing client: %v \n", err) } } } // setup is run at the beginning of a new session, before consumeclaim func (k *kafkaconsumergroup) setup(sarama.consumergroupsession) error { // mark the consumer as ready close(k.ready) return nil } // cleanup is run at the end of a session, once all consumeclaim goroutines have exited func (k *kafkaconsumergroup) cleanup(sarama.consumergroupsession) error { return nil } // consumeclaim must start a consumer loop of consumergroupclaim's messages(). func (k *kafkaconsumergroup) consumeclaim(session sarama.consumergroupsession, claim sarama.consumergroupclaim) error { // note: // do not move the code below to a goroutine. // the `consumeclaim` itself is called within a goroutine, see: // https://github.com/shopify/sarama/blob/master/consumer_group.go#l27-l29 // 具体消费消息 for message := range claim.messages() { //msg := string(message.value) //k.logger.infof("卡夫卡: %s", msg) if ok:= k.handler(message); ok { // 更新位移 session.markmessage(message, "") } //run.run(msg) } return nil }
测试代码
func testkafkaconsumergroup_init(t *testing.t) { //pd := newkafkaproducer("test-fail",true) //pd.initsync() k := newkafkaconsumergroup([]string{constant.kafkaalisdktopic}, "group-2", func(message *sarama.consumermessage) bool { fmt.println(string(message.value)) //如果失败的处理逻辑 //if ok := pd.sendmsgsync("666666"); ok { // return true //} return false }) consumerdone := k.init() sigterm := make(chan os.signal, 1) signal.notify(sigterm, syscall.sigint, syscall.sigterm) select { case <-sigterm: fmt.println("terminating: via signal") } consumerdone() }
这里有一些补偿逻辑在里面。
以上就是logrus相关hook。
好了,这篇logrus hook输出日志到本地磁盘的操作就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。