golang如何使用sarama访问kafka
程序员文章站
2023-10-11 12:08:52
下面一个客户端代码例子访问kafka服务器,来发送和接受消息。
使用方式
1、命令行参数
$ ./kafkaclient -h
usage of ./c...
下面一个客户端代码例子访问kafka服务器,来发送和接受消息。
使用方式
1、命令行参数
$ ./kafkaclient -h usage of ./client: -ca string ca certificate (default "ca.pem") -cert string client certificate (default "cert.pem") -command string consumer|producer (default "consumer") -host string common separated kafka hosts (default "localhost:9093") -key string client key (default "key.pem") -partition int kafka topic partition -tls tls enable -topic string kafka topic (default "test--topic")
2、作为producer启动
$ ./kafkaclient -command producer \ -host kafka1:9092,kafka2:9092 ## tls-enabled $ ./kafkaclient -command producer \ -tls -cert client.pem -key client.key -ca ca.pem \ -host kafka1:9093,kafka2:9093
producer发送消息给kafka:
> aaa 2018/12/15 07:11:21 produced message: [aaa] > bbb 2018/12/15 07:11:30 produced message: [bbb] > quit
3、作为consumer启动
$ ./kafkaclient -command consumer \ -host kafka1:9092,kafka2:9092 ## tls-enabled $ ./kafkaclient -command consumer \ -tls -cert client.pem -key client.key -ca ca.pem \ -host kafka1:9093,kafka2:9093
consumer从kafka接受消息:
2018/12/15 07:11:21 consumed message: [aaa], offset: [4]
2018/12/15 07:11:30 consumed message: [bbb], offset: [5]
完整源代码如下
这个代码使用到了shopify/sarama库,请自行下载使用。
$ cat kafkaclient.go package main import ( "flag" "fmt" "log" "os" "io/ioutil" "bufio" "strings" "crypto/tls" "crypto/x509" "github.com/shopify/sarama" ) var ( command string tlsenable bool hosts string topic string partition int clientcert string clientkey string cacert string ) func main() { flag.stringvar(&command, "command", "consumer", "consumer|producer") flag.boolvar(&tlsenable, "tls", false, "tls enable") flag.stringvar(&hosts, "host", "localhost:9093", "common separated kafka hosts") flag.stringvar(&topic, "topic", "test--topic", "kafka topic") flag.intvar(&partition, "partition", 0, "kafka topic partition") flag.stringvar(&clientcert, "cert", "cert.pem", "client certificate") flag.stringvar(&clientkey, "key", "key.pem", "client key") flag.stringvar(&cacert, "ca", "ca.pem", "ca certificate") flag.parse() config := sarama.newconfig() if tlsenable { //sarama.logger = log.new(os.stdout, "[sarama] ", log.lstdflags) tlsconfig, err := gentlsconfig(clientcert, clientkey, cacert) if err != nil { log.fatal(err) } config.net.tls.enable = true config.net.tls.config = tlsconfig } client, err := sarama.newclient(strings.split(hosts, ","), config) if err != nil { log.fatalf("unable to create kafka client: %q", err) } if command == "consumer" { consumer, err := sarama.newconsumerfromclient(client) if err != nil { log.fatal(err) } defer consumer.close() loopconsumer(consumer, topic, partition) } else { producer, err := sarama.newasyncproducerfromclient(client) if err != nil { log.fatal(err) } defer producer.close() loopproducer(producer, topic, partition) } } func gentlsconfig(clientcertfile, clientkeyfile, cacertfile string) (*tls.config, error) { // load client cert clientcert, err := tls.loadx509keypair(clientcertfile, clientkeyfile) if err != nil { return nil, err } // load ca cert pool cacert, err := ioutil.readfile(cacertfile) if err != nil { return nil, err } cacertpool := x509.newcertpool() cacertpool.appendcertsfrompem(cacert) // generate tlcconfig tlsconfig := tls.config{} tlsconfig.rootcas = cacertpool tlsconfig.certificates = []tls.certificate{clientcert} tlsconfig.buildnametocertificate() // tlsconfig.insecureskipverify = true // this can be used on test server if domain does not match cert: return &tlsconfig, err } func loopproducer(producer sarama.asyncproducer, topic string, partition int) { scanner := bufio.newscanner(os.stdin) fmt.print("> ") for scanner.scan() { text := scanner.text() if text == "" { } else if text == "exit" || text == "quit" { break } else { producer.input() <- &sarama.producermessage{topic: topic, key: nil, value: sarama.stringencoder(text)} log.printf("produced message: [%s]\n",text) } fmt.print("> ") } } func loopconsumer(consumer sarama.consumer, topic string, partition int) { partitionconsumer, err := consumer.consumepartition(topic, int32(partition), sarama.offsetnewest) if err != nil { log.println(err) return } defer partitionconsumer.close() for { msg := <-partitionconsumer.messages() log.printf("consumed message: [%s], offset: [%d]\n", msg.value, msg.offset) } }
编译:
$ go build kafkaclient.go
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持。