欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

golang如何使用sarama访问kafka

程序员文章站 2023-10-11 12:08:52
下面一个客户端代码例子访问kafka服务器,来发送和接受消息。 使用方式 1、命令行参数 $ ./kafkaclient -h usage of ./c...

下面一个客户端代码例子访问kafka服务器,来发送和接受消息。

使用方式

1、命令行参数

$ ./kafkaclient -h
usage of ./client:
 -ca string
  ca certificate (default "ca.pem")
 -cert string
  client certificate (default "cert.pem")
 -command string
  consumer|producer (default "consumer")
 -host string
  common separated kafka hosts (default "localhost:9093")
 -key string
  client key (default "key.pem")
 -partition int
  kafka topic partition
 -tls
  tls enable
 -topic string
  kafka topic (default "test--topic")

2、作为producer启动

$ ./kafkaclient -command producer \
 -host kafka1:9092,kafka2:9092

## tls-enabled
$ ./kafkaclient -command producer \
 -tls -cert client.pem -key client.key -ca ca.pem \
 -host kafka1:9093,kafka2:9093

producer发送消息给kafka:

> aaa
2018/12/15 07:11:21 produced message: [aaa]
> bbb
2018/12/15 07:11:30 produced message: [bbb]
> quit

3、作为consumer启动

$ ./kafkaclient -command consumer \
 -host kafka1:9092,kafka2:9092

## tls-enabled
$ ./kafkaclient -command consumer \
 -tls -cert client.pem -key client.key -ca ca.pem \
 -host kafka1:9093,kafka2:9093

consumer从kafka接受消息:

2018/12/15 07:11:21 consumed message: [aaa], offset: [4]
2018/12/15 07:11:30 consumed message: [bbb], offset: [5]

完整源代码如下

这个代码使用到了shopify/sarama库,请自行下载使用。

$ cat kafkaclient.go
package main

import (
 "flag"
 "fmt"
 "log"
 "os"
 "io/ioutil"
 "bufio"
 "strings"

 "crypto/tls"
 "crypto/x509"

 "github.com/shopify/sarama"
)

var (
 command  string
 tlsenable bool
 hosts  string
 topic  string
 partition int
 clientcert string
 clientkey string
 cacert  string
)

func main() {
 flag.stringvar(&command, "command",  "consumer",   "consumer|producer")
 flag.boolvar(&tlsenable, "tls",   false,    "tls enable")
 flag.stringvar(&hosts,  "host",   "localhost:9093", "common separated kafka hosts")
 flag.stringvar(&topic,  "topic",  "test--topic",  "kafka topic")
 flag.intvar(&partition,  "partition", 0,     "kafka topic partition")
 flag.stringvar(&clientcert, "cert",   "cert.pem",   "client certificate")
 flag.stringvar(&clientkey, "key",   "key.pem",   "client key")
 flag.stringvar(&cacert,  "ca",   "ca.pem",   "ca certificate")
 flag.parse()

 config := sarama.newconfig()
 if tlsenable {
  //sarama.logger = log.new(os.stdout, "[sarama] ", log.lstdflags)
  tlsconfig, err := gentlsconfig(clientcert, clientkey, cacert)
  if err != nil {
   log.fatal(err)
  }

  config.net.tls.enable = true
  config.net.tls.config = tlsconfig
 }
 client, err := sarama.newclient(strings.split(hosts, ","), config)
 if err != nil {
  log.fatalf("unable to create kafka client: %q", err)
 }

 if command == "consumer" {
  consumer, err := sarama.newconsumerfromclient(client)
  if err != nil {
   log.fatal(err)
  }
  defer consumer.close()
  loopconsumer(consumer, topic, partition)
 } else {
  producer, err := sarama.newasyncproducerfromclient(client)
  if err != nil {
   log.fatal(err)
  }
  defer producer.close()
  loopproducer(producer, topic, partition)
 }
}

func gentlsconfig(clientcertfile, clientkeyfile, cacertfile string) (*tls.config, error) {
 // load client cert
 clientcert, err := tls.loadx509keypair(clientcertfile, clientkeyfile)
 if err != nil {
  return nil, err
 }

 // load ca cert pool
 cacert, err := ioutil.readfile(cacertfile)
 if err != nil {
  return nil, err
 }
 cacertpool := x509.newcertpool()
 cacertpool.appendcertsfrompem(cacert)

 // generate tlcconfig
 tlsconfig := tls.config{}
 tlsconfig.rootcas = cacertpool
 tlsconfig.certificates = []tls.certificate{clientcert}
 tlsconfig.buildnametocertificate()
 // tlsconfig.insecureskipverify = true // this can be used on test server if domain does not match cert:
 return &tlsconfig, err
}

func loopproducer(producer sarama.asyncproducer, topic string, partition int) {
 scanner := bufio.newscanner(os.stdin)
 fmt.print("> ")
 for scanner.scan() {
  text := scanner.text()
  if text == "" {
  } else if text == "exit" || text == "quit" {
   break
  } else {
   producer.input() <- &sarama.producermessage{topic: topic, key: nil, value: sarama.stringencoder(text)}
   log.printf("produced message: [%s]\n",text)
  }
  fmt.print("> ")
 }
}

func loopconsumer(consumer sarama.consumer, topic string, partition int) {
 partitionconsumer, err := consumer.consumepartition(topic, int32(partition), sarama.offsetnewest)
 if err != nil {
  log.println(err)
  return
 }
 defer partitionconsumer.close()

 for {
  msg := <-partitionconsumer.messages()
  log.printf("consumed message: [%s], offset: [%d]\n", msg.value, msg.offset)
 }
}

编译:

$ go build kafkaclient.go

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。