欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

golang连接kafka消费进ES操作

程序员文章站 2022-07-05 19:58:50
1.首先初始化conf配置把kafka和es的地址配置好还有一个日志方便查看配置信息如下 用到的库是github.com/astaxie/beego/config[logs]log_level = d...

1.首先初始化conf配置把kafka和es的地址配置好还有一个日志方便查看

配置信息如下 用到的库是

github.com/astaxie/beego/config
[logs]
log_level = debug
log_path = "./logs/log_transfer.log"
[kafka]
server_addr = 192.168.0.134:9092
topic = nginx_log
[es]
addr = http://192.168.0.134:9200/

2.读取conf配置存取进结构体

type logconfig struct {
  kafkaaddr string
  esaddr string
  logpath string
  loglevel string
  topic string
}
var (
   logconfig *logconfig
)

3.读取conf配置代码如下

func initconfig(conftype string,filename string)(err error) {
   conf, err := config. newconfig(conftype,filename)
   if err != nil {
    fmt. println( "new config faild,err:",err)
     return
  }
   logconfig = &logconfig{}
   logconfig.loglevel = conf. string( "logs::log_level")
   if len(logconfig.loglevel) == 0 {
     logconfig.loglevel = "debug"
  }
   logconfig.logpath = conf. string( "logs::log_path")
   if len(logconfig.logpath) == 0 {
     logconfig.logpath = "./logs"
  }
   logconfig.kafkaaddr = conf. string( "kafka::server_addr")
   if len(logconfig.kafkaaddr) == 0 {
     err = fmt. errorf( "invalid kafka addr err")
     return
  }
   logconfig.esaddr = conf. string( "es::addr")
   if len(logconfig.esaddr) == 0 {
     err = fmt. errorf( "invalid es addr err")
     return
  }
   logconfig.topic = conf. string( "kafka::topic")
   if len(logconfig.topic) == 0 {
     err = fmt. errorf( "invalid topic addr err")
     return
  }
   return
}

4.完成了initconfig的初始化

5.初始化initlogger

func convertloglevel(level string) int {
   switch(level) {
     case "debug":
       return logs.leveldebug
     case "warn":
       return logs.levelwarn
     case "info":
       return logs.levelinfo
     case "trace":
       return logs.leveltrace
  }
   return logs.leveldebug
}
func initlogger(logpath string, loglevel string) (err error) {
   config := make( map[ string] interface{})
  config[ "filename"] = logpath
  config[ "level"] = convertloglevel(loglevel)
   configstr, err := json. marshal(config)
   if err!= nil {
    fmt. println( "marshal failed,err:",err)
     return
  }
  logs. setlogger(logs.adapterfile, string(configstr))
   return
}

6.初始化kafka

type kafkaclient struct {
  client sarama.consumer
  addr string
  topic string
  wg sync.waitgroup
}
var (
   kafkaclient *kafkaclient
)
func initkafka(addr string,topic string)(err error) {
   kafkaclient = &kafkaclient{}
   consumer, err := sarama. newconsumer(strings. split(addr, ","), nil)
   if err != nil {
    logs. error( "failed to strat consumer :",err)
     return 
  }
   kafkaclient.client = consumer
   kafkaclient.addr = addr
   kafkaclient.topic = topic
   return
}

7.初始化es

gopkg.in/olivere/elastic.v2 // 这个是操作es的库

type logmessage struct {
  app string
  topic string
  message string
}
var (
   esclient *elastic.client
)
func inites(addr string)(err error) {
  
   client, err := elastic. newclient(elastic. setsniff( false),elastic. seturl(addr))
   if err != nil {
    fmt. println( "connect es error",err)
     return
  }
   esclient = client
   return
}

8.干活把kafka的数据写入es

github.com/shopify/sarama 这个是操作kafka的驱动库

func run()(err error) {
  fmt. println( "run")
   partitionlist, err := kafkaclient.client. partitions(kafkaclient.topic)
   if err != nil {
    logs. error( "ini failed ,err:%v",err)
    fmt. printf( "ini failed ,err:%v",err)
     return
  }
   for partition := range partitionlist {
    fmt. println( "for进入")
     pc, errret := kafkaclient.client. consumepartition(kafkaclient.topic, int32(partition),sarama.offsetnewest)
     if errret != nil {
       err = errret
      logs. error( "failed to start consumer for partition %d: %s \n ",partition,err)
      fmt. printf( "failed to start consumer for partition %d: %s \n ",partition,err)
       return
    }
     defer pc. asyncclose()
    fmt. println( "马上进入协程")
    kafkaclient.wg. add( 1)
     go func(pc sarama.partitionconsumer){
      fmt. println( "进来了")
      
       for msg := range pc. messages() {
        fmt. println( "func执行")
        logs. debug( "partition:%d,offset:%d,key:%s,value:%s",msg.partition,msg.offset, string(msg.key), string(msg.value))
         //fmt.println()
         err = sendtoes(kafkaclient.topic,msg.value)
         if err != nil {
          logs. warn( "send to es failed,err:%v",err)
        }
      }
      kafkaclient.wg. done()
    }(pc)    
  }
  kafkaclient.wg. wait()
  fmt. println( "协程执行完毕")
   return
}

上面代码是读kafka消费数据通过sendtoes这个函数发送至es里面

sendtoes代码如下

func sendtoes(topic string,data [] byte) (err error) {
   msg := &logmessage{}
   msg.topic = topic
   msg.message = string(data)
   _, err = esclient. index().
       index(topic).
       type(topic).
       bodyjson(msg).
       do()
     if err != nil {
      
       return
    }
   return
}

index就是索引名称

index().type().bodyjson().do()这样的写法是链式执行操作

9.写完了基本操作后 再写一个模拟写入数据进kafka的数据 代码如下

func main() {
config := sarama. newconfig()
config.producer.requiredacks = sarama.waitforall
config.producer.partitioner = sarama.newrandompartitioner
config.producer.return.successes = true
client, err := sarama. newsyncproducer([] string{ "127.0.0.1:9092"}, config)
if err != nil {
fmt. println( "producer close,err:", err)
return
}
defer client. close()
var n int= 0
for {
n++
msg := &sarama.producermessage{}
msg.topic = "nginx_log"
msg.value = sarama. stringencoder( "this is a good test,hello maomaochong!!," + strconv. itoa(n))
pid, offset, err := client. sendmessage(msg)
if err != nil {
fmt. println( "send message failed,", err)
return
}
fmt. printf( "pid:%v offset:%v \n ", pid, offset)
time. sleep(time.second * 2)
}
}

这个就是生产者往kafka里面写入数据进去消费

10.我们启动我们的kafka 注意kafka依赖于zookeeper 先启动zk然后启动kafka

我这里用的是zookeeper-3.4.12网上有下载

启动zk

golang连接kafka消费进ES操作

zk已经成功启动

11.启动kafka 我这里是kafka_2.11-1.1.0

.\bin\windows\kafka-server-start.bat .\config\server.properties

golang连接kafka消费进ES操作

kafka已经跑起来了

12.把kafka消费测试端也启动

.\bin\windows\kafka-console-consumer.bat --topic nginx_log --zookeeper 127.0.0.1 2181

golang连接kafka消费进ES操作

消费端启动成功 一直等待数据进来消费

13.然后我们把es 和kib 都启动了

golang连接kafka消费进ES操作

这是我们的es版本是5.5.1的 已经跑起来了 接着启动我们的kib

kib里面有个配置config下面的叫kibana.yml里面设置好es的地址和端口就处于监听es状态

启动kib有点慢 稍微等一下就好

golang连接kafka消费进ES操作

此时启动好了kib

14.测试kib是否启动

默认地址是http://localhost:5601

golang连接kafka消费进ES操作

进入成功 确定没问题

15.编译我们的代码 写数据进kafka

golang连接kafka消费进ES操作

从上面看我们知道一个再写 一个再消费

16.编译运行我们把kafka写入进es里面的代码

golang连接kafka消费进ES操作

运行了 这里就把kafka消费的数据 写入进es里面

17.我们看一下数据是否有 进入kib

golang连接kafka消费进ES操作

我们看到有数据了 已经成功了~

以上为个人经验,希望能给大家一个参考,也希望大家多多支持。如有错误或未考虑完全的地方,望不吝赐教。