欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

logrus hook输出日志到本地磁盘的操作

程序员文章站 2022-03-21 11:35:14
logrus是go的一个日志框架,它最让人激动的应该是hook机制,可以在初始化时为logrus添加hook,logrus可以实现各种扩展功能,可以将日志输出到elasticsearch和active...

logrus是go的一个日志框架,它最让人激动的应该是hook机制,可以在初始化时为logrus添加hook,logrus可以实现各种扩展功能,可以将日志输出到elasticsearch和activemq等中间件去,甚至可以输出到你的email和叮叮中去,不要问为为什么可以发现可以输入到叮叮中去,都是泪,手动笑哭!

言归正传,这里就简单的通过hook机制将文件输出到本地磁盘。

首先

go get github.com/sirupsen/logrus

然后

logrus和go lib里面一样有6个等级,可以直接调用

logrus.debug("useful debugging information.")
logrus.info("something noteworthy happened!")
logrus.warn("you should probably take a look at this.")
logrus.error("something failed but i'm not quitting.")
logrus.fatal("bye.")  //log之后会调用os.exit(1)
logrus.panic("i'm bailing.")  //log之后会panic()

项目例子结构

logrus hook输出日志到本地磁盘的操作

main.go

package main

import (
 "fmt"
 "github.com/sirupsen/logrus"
 "logt/logs"
)
func main() {
  //创建一个hook,将日志存储路径输入进去
 hook := logs.newhook("d:/log/golog.log")
 //加载hook之前打印日志
 logrus.withfield("file", "d:/log/golog.log").info("new logrus hook err.")
 logrus.addhook(hook)
 //加载hook之后打印日志
 logrus.withfields(logrus.fields{
 "animal": "walrus",
 }).info("a walrus appears")
}

hook.go

不要看下面三个go文件代码很长,其实大多数都是固定代码,也就newhook函数自己扩展定义就好

package logs

import (
 "fmt"
 "github.com/sirupsen/logrus"
 "os"
 "strings"
)

// hook 写文件的logrus hook
type hook struct {
 w loggerinterface
}

func newhook(file string) (f *hook) {
 w := newfilewriter()
 config := fmt.sprintf(`{"filename":"%s","maxdays":7}`, file)
 err := w.init(config)
 if err != nil {
 return nil
 }

 return &hook{w}
}

// fire 实现hook的fire接口
func (hook *hook) fire(entry *logrus.entry) (err error) {
 message, err := getmessage(entry)
 if err != nil {
 fmt.fprintf(os.stderr, "unable to read entry, %v", err)
 return err
 }
 switch entry.level {
 case logrus.paniclevel:
 fallthrough
 case logrus.fatallevel:
 fallthrough
 case logrus.errorlevel:
 return hook.w.writemsg(fmt.sprintf("[error] %s", message), levelerror)
 case logrus.warnlevel:
 return hook.w.writemsg(fmt.sprintf("[warn] %s", message), levelwarn)
 case logrus.infolevel:
 return hook.w.writemsg(fmt.sprintf("[info] %s", message), levelinfo)
 case logrus.debuglevel:
 return hook.w.writemsg(fmt.sprintf("[debug] %s", message), leveldebug)
 default:
 return nil
 }
}

// levels 实现hook的levels接口
func (hook *hook) levels() []logrus.level {
 return []logrus.level{
 logrus.paniclevel,
 logrus.fatallevel,
 logrus.errorlevel,
 logrus.warnlevel,
 logrus.infolevel,
 logrus.debuglevel,
 }
}

func getmessage(entry *logrus.entry) (message string, err error) {
 message = message + fmt.sprintf("%s ", entry.message)
 file, linenumber := getcallerignoringlogmulti(2)
 if file != "" {
 sep := fmt.sprintf("%s/src/", os.getenv("gopath"))
 filename := strings.split(file, sep)
 if len(filename) >= 2 {
  file = filename[1]
 }
 }
 message = fmt.sprintf("%s:%d ", file, linenumber) + message

 for k, v := range entry.data {
 message = message + fmt.sprintf("%v:%v ", k, v)
 }
 return
}

caller.go

package logs

import (
 "runtime"
 "strings"
)

func getcaller(calldepth int, suffixestoignore ...string) (file string, line int) {
 // bump by 1 to ignore the getcaller (this) stackframe
 calldepth++
outer:
 for {
 var ok bool
 _, file, line, ok = runtime.caller(calldepth)
 if !ok {
  file = "???"
  line = 0
  break
 }

 for _, s := range suffixestoignore {
  if strings.hassuffix(file, s) {
  calldepth++
  continue outer
  }
 }
 break
 }
 return
}

// getcallerignoringlogmulti todo
func getcallerignoringlogmulti(calldepth int) (string, int) {
 // the +1 is to ignore this (getcallerignoringlogmulti) frame
 return getcaller(calldepth+1, "logrus/hooks.go", "logrus/entry.go", "logrus/logger.go", "logrus/exported.go", "asm_amd64.s")
}

file.go

package logs

import (
 "encoding/json"
 "errors"
 "fmt"
 "io/ioutil"
 "log"
 "os"
 "path/filepath"
 "strings"
 "sync"
 "time"
)

// rfc5424 log message levels.
const (
 levelerror = iota
 levelwarn
 levelinfo
 leveldebug
)

// loggerinterface logger接口
type loggerinterface interface {
 init(config string) error
 writemsg(msg string, level int) error
 destroy()
 flush()
}

// logwriter implements loggerinterface.
// it writes messages by lines limit, file size limit, or time frequency.
type logwriter struct {
 *log.logger
 mw *muxwriter
 // the opened file
 filename string `json:"filename"`

 maxlines     int `json:"maxlines"`
 maxlinescurlines int

 // rotate at size
 maxsize    int `json:"maxsize"`
 maxsizecursize int

 // rotate daily
 daily     bool `json:"daily"`
 maxdays    int64 `json:"maxdays"`
 dailyopendate int

 rotate bool `json:"rotate"`

 startlock sync.mutex // only one log can write to the file

 level int `json:"level"`
}

// muxwriter an *os.file writer with locker.
type muxwriter struct {
 sync.mutex
 fd *os.file
}

// write to os.file.
func (l *muxwriter) write(b []byte) (int, error) {
 l.lock()
 defer l.unlock()
 return l.fd.write(b)
}

// setfd set os.file in writer.
func (l *muxwriter) setfd(fd *os.file) {
 if l.fd != nil {
 _ = l.fd.close()
 }
 l.fd = fd
}

// newfilewriter create a filelogwriter returning as loggerinterface.
func newfilewriter() loggerinterface {
 w := &logwriter{
 filename: "",
 maxlines: 1000000,
 maxsize: 1 << 28, //256 mb
 daily:  true,
 maxdays: 7,
 rotate:  true,
 level:  leveldebug,
 }
 // use muxwriter instead direct use os.file for lock write when rotate
 w.mw = new(muxwriter)
 // set muxwriter as logger's io.writer
 w.logger = log.new(w.mw, "", log.ldate|log.ltime)
 return w
}

// init file logger with json config.
// jsonconfig like:
// {
// "filename":"logs/sample.log",
// "maxlines":10000,
// "maxsize":1<<30,
// "daily":true,
// "maxdays":15,
// "rotate":true
// }
func (w *logwriter) init(jsonconfig string) error {
 err := json.unmarshal([]byte(jsonconfig), w)
 if err != nil {
 return err
 }
 if len(w.filename) == 0 {
 return errors.new("jsonconfig must have filename")
 }
 err = w.startlogger()
 return err
}

// start file logger. create log file and set to locker-inside file writer.
func (w *logwriter) startlogger() error {
 fd, err := w.createlogfile()
 if err != nil {
 return err
 }
 w.mw.setfd(fd)
 err = w.initfd()
 if err != nil {
 return err
 }
 return nil
}

func (w *logwriter) docheck(size int) {
 w.startlock.lock()
 defer w.startlock.unlock()
 if w.rotate && ((w.maxlines > 0 && w.maxlinescurlines >= w.maxlines) ||
 (w.maxsize > 0 && w.maxsizecursize >= w.maxsize) ||
 (w.daily && time.now().day() != w.dailyopendate)) {
 if err := w.dorotate(); err != nil {
  fmt.fprintf(os.stderr, "filelogwriter(%q): %s\n", w.filename, err)
  return
 }
 }
 w.maxlinescurlines++
 w.maxsizecursize += size
}

// writemsg write logger message into file.
func (w *logwriter) writemsg(msg string, level int) error {
 if level > w.level {
 return nil
 }
 n := 24 + len(msg) // 24 stand for the length "2013/06/23 21:00:22 [t] "
 w.docheck(n)
 w.logger.print(msg)
 return nil
}

func (w *logwriter) createlogfile() (*os.file, error) {
 // open the log file
 fd, err := os.openfile(w.filename, os.o_wronly|os.o_append|os.o_create, 0660)
 return fd, err
}

func (w *logwriter) initfd() error {
 fd := w.mw.fd
 finfo, err := fd.stat()
 if err != nil {
 return fmt.errorf("get stat err: %s", err)
 }
 w.maxsizecursize = int(finfo.size())
 w.dailyopendate = time.now().day()
 if finfo.size() > 0 {
 content, err := ioutil.readfile(w.filename)
 if err != nil {
  return err
 }
 w.maxlinescurlines = len(strings.split(string(content), "\n"))
 } else {
 w.maxlinescurlines = 0
 }
 return nil
}

// dorotate means it need to write file in new file.
// new file name like xx.log.2013-01-01.2
func (w *logwriter) dorotate() error {
 _, err := os.lstat(w.filename)
 if err == nil { // file exists
 // find the next available number
 num := 1
 fname := ""
 for ; err == nil && num <= 999; num++ {
  fname = w.filename + fmt.sprintf(".%s.%03d", time.now().format("2006-01-02"), num)
  _, err = os.lstat(fname)
 }
 // return error if the last file checked still existed
 if err == nil {
  return fmt.errorf("rotate: cannot find free log number to rename %s", w.filename)
 }

 // block logger's io.writer
 w.mw.lock()
 defer w.mw.unlock()

 fd := w.mw.fd
 _ = fd.close()

 // close fd before rename
 // rename the file to its newfound home
 err = os.rename(w.filename, fname)
 if err != nil {
  return fmt.errorf("rotate: %s", err)
 }

 // re-start logger
 err = w.startlogger()
 if err != nil {
  return fmt.errorf("rotate startlogger: %s", err)
 }

 go w.deleteoldlog()
 }

 return nil
}

func (w *logwriter) deleteoldlog() {
 dir := filepath.dir(w.filename)
 _ = filepath.walk(dir, func(path string, info os.fileinfo, err error) (returnerr error) {
 defer func() {
  if r := recover(); r != nil {
  returnerr = fmt.errorf("unable to delete old log '%s', error: %+v", path, r)
  fmt.println(returnerr)
  }
 }()

 if !info.isdir() && info.modtime().unix() < (time.now().unix()-60*60*24*w.maxdays) {
  if strings.hasprefix(filepath.base(path), filepath.base(w.filename)) {
  _ = os.remove(path)
  }
 }
 return
 })
}

// destroy destroy file logger, close file writer.
func (w *logwriter) destroy() {
 _ = w.mw.fd.close()
}

// flush file logger.
// there are no buffering messages in file logger in memory.
// flush file means sync file from disk.
func (w *logwriter) flush() {
 _ = w.mw.fd.sync()
}

补充知识:golang logrus自定义hook:日志切片hook、邮件警报hook、kafkahook

logrus hook 分析

logrus hook 接口定义很简单。如下

package logrus

// a hook to be fired when logging on the logging levels returned from
// `levels()` on your implementation of the interface. note that this is not
// fired in a goroutine or a channel with workers, you should handle such
// functionality yourself if your call is non-blocking and you don't wish for
// the logging calls for levels returned from `levels()` to block.
type hook interface {
 levels() []level
 fire(*entry) error
}

// internal type for storing the hooks on a logger instance.
type levelhooks map[level][]hook

// add a hook to an instance of logger. this is called with
// `log.hooks.add(new(myhook))` where `myhook` implements the `hook` interface.
func (hooks levelhooks) add(hook hook) {
 for _, level := range hook.levels() {
 hooks[level] = append(hooks[level], hook)
 }
}

// fire all the hooks for the passed level. used by `entry.log` to fire
// appropriate hooks for a log entry.
func (hooks levelhooks) fire(level level, entry *entry) error {
 for _, hook := range hooks[level] {
 if err := hook.fire(entry); err != nil {
  return err
 }
 }
 return nil
}

只需实现 该结构的接口。

type hook interface {
 levels() []level
 fire(*entry) error
}

就会被logrus框架遍历调用已注册的 hook 的 fire 方法

获取日志实例

// log_hook.go
package logger

import (
 "fmt"
 "github.com/sirupsen/logrus"
 "library/util/constant"
 "os"
)

//自实现 logrus hook
func getlogger(module string) *logrus.logger {
 //实例化
 logger := logrus.new()
 //设置输出
 logger.out = os.stdout
 //设置日志级别
 logger.setlevel(logrus.debuglevel)
 //设置日志格式
 //自定writer就行, hook 交给 lfshook
 logger.addhook(newlogrushook(constant.getlogpath(), module))
 
 logger.setformatter(&logrus.jsonformatter{
 timestampformat:"2006-01-02 15:04:05",
 })
 return logger
}

//确保每次调用使用的文件都是唯一的。
func getnewfieldloggercontext(module,appfield string) *logrus.entry {
 logger:= getlogger(module)
 return logger.withfields(logrus.fields{
 "app": appfield,
 })
}

//订阅 警告日志
func subscribelog(entry *logrus.entry, submap subscribemap) {
 logger := entry.logger
 logger.addhook(newsubscribehook(submap))
 fmt.println("日志订阅成功")
}

constant.getlogpath() 可以替换为自己的日志文件输出目录地址,比如我的mac上则是:/usr/local/log ,直接替换即可。

日志切片hook

代码

// writer.go
package logger

import (
 "fmt"
 "github.com/pkg/errors"
 "io"
 "library/util"
 "os"
 "path/filepath"
 "sync"
 "time"
)

type logwriter struct {
 logdir       string //日志根目录地址。
 module       string //模块 名
  curfilename   string //当前被指定的filename
 curbasefilename   string //在使用中的file
 turncateduration  time.duration
 mutex      sync.rwmutex
 outfh      *os.file
}

func (w *logwriter) write(p []byte) (n int, err error) {
 w.mutex.lock()
 defer w.mutex.unlock()
 if out, err:= w.getwriter(); err!=nil {
 return 0, errors.new("failed to fetch target io.writer")
 }else{
 return out.write(p)
 }
}

func (w *logwriter) getfilename() string {
 base := time.now().truncate(w.turncateduration)
 return fmt.sprintf("%s/%s/%s_%s", w.logdir, base.format("2006-01-02"), w.module, base.format("15"))
}

func (w *logwriter) getwriter()(io.writer, error) {
 filename := w.curbasefilename
 //判断是否有新的文件名
 //会出现新的文件名
 basefilename := w.getfilename()
 if basefilename != filename {
 filename = basefilename
 }

 dirname := filepath.dir(filename)
 if err := os.mkdirall(dirname, 0755); err != nil {
 return nil, errors.wrapf(err, "failed to create directory %s", dirname)
 }

 filehandler, err := os.openfile(filename, os.o_create|os.o_append|os.o_wronly, 0644)
 if err != nil {
 return nil, errors.errorf("failed to open file %s", err)
 }
 w.outfh.close()
 w.outfh = filehandler
 w.curbasefilename = filename
 w.curfilename = filename

 return filehandler, nil
}

func new(logpath, module string, duration time.duration) *logwriter {
 return &logwriter{
 logdir: logpath,
 module: module,
 turncateduration:duration,
 curfilename: "",
 curbasefilename: "",
 }
}
// hook.go
package logger

import (
 "github.com/rifflock/lfshook"
 "github.com/sirupsen/logrus"
 "time"
)
func newlogrushook(logpath, moduel string) logrus.hook {
 logrus.setlevel(logrus.warnlevel)

 writer := new(logpath, moduel, time.hour * 2)

 lfshook := lfshook.newhook(lfshook.writermap{
 logrus.debuglevel: writer,
 logrus.infolevel: writer,
 logrus.warnlevel: writer,
 logrus.errorlevel: writer,
 logrus.fatallevel: writer,
 logrus.paniclevel: writer,
 }, &logrus.textformatter{disablecolors: true})

 // writer 生成新的log文件类型 writer 在通过new hook函数 消费 fire 函数
 // writer 是实现了writer 接口的库,在日志调用write是做预处理
 return lfshook
}

测试代码

func testgetlogger(t *testing.t) {
 lg := getnewfieldloggercontext("test","d")
 lg.logger.info("????")
}

解析

logger实例持有了 自定义的 io.writer 结构体,在消费fire函数时,会调用write方法,此时通过truncate时间切片函数逻辑判断需要写入的文件。或创建新的文件。

注: 文章提供的代码是按天切分文件夹的,文件夹内模块日志再按2小时切分。可自行替换成按模块切分。

邮件警报hook

代码

// subscribehook.go
package logger

import (
 "fmt"
 "github.com/sirupsen/logrus"
 "library/email"
 "strings"
)

type subscribemap map[logrus.level][]*email.receiver
type subscribehook struct {
 submap subscribemap
}
//此处可以自实现hook 目前使用三方hook
func(h *subscribehook)levels() []logrus.level{
 return logrus.alllevels
}

func(h *subscribehook)fire(entry *logrus.entry) error{
 for level, receivers := range h.submap {
 //命中 准备消费
 if level == entry.level {
  if len(receivers) > 0 {
  email.sendemail(receivers, fmt.sprintf("%s:[系统日志警报]", entry.level.string()),
   fmt.sprintf("错误内容: %s",entry.message))
  }
 }
 }
 return nil
}
func newsubscribemap(level logrus.level, receiverstr string) subscribemap{
 submap := subscribemap{}
 addresslist := strings.split(receiverstr,";")
 var receivers []*email.receiver
 for _, address := range addresslist {
 receivers = append(receivers, &email.receiver{email: address})
 }
 submap[level] = receivers
 return submap
}
func newsubscribehook(submap subscribemap) *subscribehook {
 return &subscribehook{submap}
// email.go
package email

import (
 "fmt"
 "gopkg.in/gomail.v2"
 "regexp"
 "strconv"
)

type sender struct {
 user   string
 password string
 host   string
 port   int
 mailto  []string
 subject  string
 content  string
}

type receiver struct {
 email  string
}

func (r *receiver) check() bool {
 pattern := `\w+([-+.]\w+)*@\w+([-.]\w+)*\.\w+([-.]\w+)*` //匹配电子邮箱
 reg := regexp.mustcompile(pattern)
 return reg.matchstring(r.email)
}

func (s *sender) clean (){

}

//检查 邮箱正确性
func (s *sender)newreceiver(email string) *receiver {
 rec := &receiver{email:email}
 if rec.check() {
 m.mailto = []string{email}
 return rec
 }else{
 fmt.printf("email check fail 【%s】\n", email)
 return nil
 }
}
func (s *sender)newreceivers(receivers []*receiver) {
 for _, rec := range receivers {
 if rec.check() {
  m.mailto = append(m.mailto, rec.email)
 }else{
  fmt.printf("email check fail 【%s】\n", rec.email)
 }
 }
}
// 163邮箱 password 为开启smtp后给的秘钥
var m = sender{user:"6666666@163.com", password:"666666666", host: "smtp.163.com", port: 465}

func sendemail(receivers []*receiver,subject, content string){
 m.newreceivers(receivers)
 m.subject = subject
 m.content = content

 e := gomail.newmessage()
 e.setheader("from", e.formataddress(m.user, "hengsheng"))
 e.setheader("to", m.mailto...)  //发送给多个用户
 e.setheader("subject", m.subject) //设置邮件主题
 e.setbody("text/html", m.content)  //设置邮件正文
 d := gomail.newdialer(m.host, m.port, m.user, m.password)
 err := d.dialandsend(e)
 if err != nil {
 fmt.printf("error 邮件发送错误! %s \n", err.error())
 }
}

使用

同理在writer时 如果是错误日志则发送邮件。

o.logger = logger.getnewfieldloggercontext("test", "666")
if subscribesocket {
 logger.subscribelog(o.logger, logger.newsubscribemap(logrus.errorlevel, "a@163.com;b@163.com"))
 }
 // o 为实际结构体实例

kafkahook

// kafka hook
package logger

import (
 "github.com/sirupsen/logrus"
 "library/kafka"
 "library/util/constant"
)

type kafkahook struct {
 kafkaproducer  *kafka.kafkaproducer
}


func(h *kafkahook)levels() []logrus.level{
 return logrus.alllevels
}

func(h *kafkahook)fire(entry *logrus.entry) error{
 h.kafkaproducer.sendmsgsync(entry.message)
 return nil
}

func newkafkahook() *kafkahook{
 producer := kafka.newkafkaproducer(constant.kafkalogelktopic,true)
 return &kafkahook{kafkaproducer: producer}
}

使用时logger.addhook(newkafkahook()) 即可

kafka模块

生产者

// kafkaproducer.go
package kafka

import (
 "errors"
 "fmt"
 "github.com/shopify/sarama"
 "library/util/constant"
 "log"
 "time"
)

func getkafkaaddress()[]string{
 return "127.0.0.1:9092"
}

//同步消息模式
func syncproducer(topic, message string) error {
 config := sarama.newconfig()
 config.producer.return.successes = true
 config.producer.timeout = 5 * time.second
 p, err := sarama.newsyncproducer(getkafkaaddress(), config)
 if err != nil {
 return errors.new(fmt.sprintf("sarama.newsyncproducer err, message=%s \n", err))
 }
 defer p.close()
 msg := &sarama.producermessage{
 topic: topic,
 value: sarama.byteencoder(message),
 }
 part, offset, err := p.sendmessage(msg)
 if err != nil {
 return errors.new(fmt.sprintf("send sdsds err=%s \n", err))
 } else {
 fmt.printf("发送成功,partition=%d, offset=%d \n", part, offset)
 return nil
 }
}

//async 异步生产者
type kafkaproducer struct {
 topic    string
 asyncproducer  *sarama.asyncproducer
 syncproducer  *sarama.syncproducer
 sync    bool
}

func newkafkaproducer(topic string, sync bool) *kafkaproducer {
 k := &kafkaproducer{
 topic:   topic,
 sync:   sync,
 }
 if sync {
 k.initsync()
 }else{
 k.initasync()
 }
 return k
}

func (k *kafkaproducer) initasync() bool {
 if k.sync {
 fmt.printf("sync producer cant call async func !\n")
 return false
 }
 config := sarama.newconfig()
 //等待服务器所有副本都保存成功后的响应
 config.producer.requiredacks = sarama.waitforall
 //随机向partition发送消息
 config.producer.partitioner = sarama.newrandompartitioner
 //是否等待成功和失败后的响应,只有上面的requireacks设置不是noreponse这里才有用.
 config.producer.return.successes = true
 config.producer.return.errors = true
 //设置使用的kafka版本,如果低于v0_10_0_0版本,消息中的timestrap没有作用.需要消费和生产同时配置
 //注意,版本设置不对的话,kafka会返回很奇怪的错误,并且无法成功发送消息
 config.version = sarama.v0_10_0_1

 producer, e := sarama.newasyncproducer(getkafkaaddress(), config)
 if e != nil {
 fmt.println(e)
 return false
 }
 k.asyncproducer = &producer
 defer producer.asyncclose()
 pd := *k.asyncproducer
 go func() {
 for{
  select {
  case <-pd.successes():
  //fmt.println("offset: ", suc.offset, "timestamp: ", suc.timestamp.string(), "partitions: ", suc.partition)
  case fail := <-pd.errors():
  fmt.printf("err: %s \n", fail.err.error())
  }
 }
 }()

 return true
}

func (k *kafkaproducer) initsync() bool {
 if !k.sync {
 fmt.println("async producer cant call sync func !")
 return false
 }

 config := sarama.newconfig()
 config.producer.return.successes = true
 config.producer.timeout = 5 * time.second
 p, err := sarama.newsyncproducer(getkafkaaddress(), config)
 k.syncproducer = &p
 if err != nil {
 log.printf("sarama.newsyncproducer err, message=%s \n", err)
 return false
 }
 return true
}

func (k *kafkaproducer) sendmsgasync(sendstr string) {

 msg := &sarama.producermessage{
 topic: k.topic,
 }

 //将字符串转化为字节数组
 msg.value = sarama.byteencoder(sendstr)
 //fmt.println(value)

 //使用通道发送
 pd := *k.asyncproducer
 pd.input() <- msg
}

func (k *kafkaproducer) sendmsgsync(sendstr string) bool {
 msg := &sarama.producermessage{
 topic: k.topic,
 value: sarama.byteencoder(sendstr),
 }
 pd := *k.syncproducer
 part, offset, err := pd.sendmessage(msg)
 if err != nil {
 fmt.printf("发送失败 send message(%s) err=%s \n", sendstr, err)
 return false
 } else {
 fmt.printf("发送成功 partition=%d, offset=%d \n", part, offset)
 return true
 }
}

调用 sendmsgsync 或 sendmsgasync 生产消息,注意初始化时的参数要保证一致!

消费者组

// kafkaconsumergroup.go

package kafka

import (
 "context"
 "fmt"
 "github.com/shopify/sarama"
 "log"
 "sync"
)

func newkafkaconsumergroup(topics []string, group string, businesscall func(message *sarama.consumermessage) bool) *kafkaconsumergroup {
 k := &kafkaconsumergroup{
 brokers:  getkafkaaddress(),
 topics:  topics,
 group:       group,
 channelbuffersize: 2,
 ready:       make(chan bool),
 version:  "1.1.1",
 handler:  businesscall,
 }
 k.init()
 return k
}

// 消费者组(consumer group): 相同的group.id的消费者将视为同一个消费者组,
// 每个消费者都需要设置一个组id, 每条消息只能被 consumer group 中的一个
// consumer 消费,但可以被多个 consumer group 消费
type kafkaconsumergroup struct {
 //代理(broker): 一台kafka服务器称之为一个broker
 brokers   []string
 //主题(topic): 消息的一种逻辑分组,用于对消息分门别类,每一类消息称之为一个主题,相同主题的消息放在一个队列中
 topics    []string
 version   string
 ready       chan bool
 group       string
 channelbuffersize  int
 //业务调用
 handler     func(message *sarama.consumermessage) bool
}

func (k *kafkaconsumergroup)init() func() {

 version,err := sarama.parsekafkaversion(k.version)
 if err!=nil{
 fmt.printf("error parsing kafka version: %v", err)
 }
 cfg := sarama.newconfig()
 cfg.version = version
 // 分区分配策略
 cfg.consumer.group.rebalance.strategy = sarama.balancestrategyrange
 // 未找到组消费位移的时候从哪边开始消费
 cfg.consumer.offsets.initial = -2
 // channel长度
 cfg.channelbuffersize = k.channelbuffersize
 ctx, cancel := context.withcancel(context.background())
 client, err := sarama.newconsumergroup(k.brokers, k.group, cfg)
 if err != nil {
  fmt.printf("error creating consumer group client: %v", err)
 }

 wg := &sync.waitgroup{}
 wg.add(1)
 go func() {
  defer func() {
  wg.done()
  //util.handlepanic("client.consume panic", log.standardlogger())
  }()
  for {
  if err := client.consume(ctx, k.topics, k); err != nil {
   log.printf("error from consumer: %v", err)
  }
  // check if context was cancelled, signaling that the consumer should stop
  if ctx.err() != nil {
   log.println(ctx.err())
   return
  }
  k.ready = make(chan bool)
  }
 }()

 <-k.ready
 fmt.printf("sarama consumer up and running!... \n")
 // 保证在系统退出时,通道里面的消息被消费
 return func() {
  cancel()
  wg.wait()
  if err = client.close(); err != nil {
  fmt.printf("error closing client: %v \n", err)
  }
 }

}

// setup is run at the beginning of a new session, before consumeclaim
func (k *kafkaconsumergroup) setup(sarama.consumergroupsession) error {
 // mark the consumer as ready
 close(k.ready)
 return nil
}

// cleanup is run at the end of a session, once all consumeclaim goroutines have exited
func (k *kafkaconsumergroup) cleanup(sarama.consumergroupsession) error {
 return nil
}

// consumeclaim must start a consumer loop of consumergroupclaim's messages().
func (k *kafkaconsumergroup) consumeclaim(session sarama.consumergroupsession, claim sarama.consumergroupclaim) error {

 // note:
 // do not move the code below to a goroutine.
 // the `consumeclaim` itself is called within a goroutine, see:
 // https://github.com/shopify/sarama/blob/master/consumer_group.go#l27-l29
 // 具体消费消息
 for message := range claim.messages() {
 //msg := string(message.value)
 //k.logger.infof("卡夫卡: %s", msg)

 if ok:= k.handler(message); ok {
  // 更新位移
  session.markmessage(message, "")
 }
 //run.run(msg)
 }
 return nil
}

测试代码

func testkafkaconsumergroup_init(t *testing.t) {
 //pd := newkafkaproducer("test-fail",true)
 //pd.initsync()
 k := newkafkaconsumergroup([]string{constant.kafkaalisdktopic}, "group-2", func(message *sarama.consumermessage) bool {
 fmt.println(string(message.value))
 //如果失败的处理逻辑
 //if ok := pd.sendmsgsync("666666"); ok {
 // return true
 //}
 return false

 })
 consumerdone := k.init()

 sigterm := make(chan os.signal, 1)
 signal.notify(sigterm, syscall.sigint, syscall.sigterm)
 select {
 case <-sigterm:
 fmt.println("terminating: via signal")
 }
 consumerdone()
}

这里有一些补偿逻辑在里面。

以上就是logrus相关hook。

好了,这篇logrus hook输出日志到本地磁盘的操作就是小编分享给大家的全部内容了,希望能给大家一个参考,也希望大家多多支持。