c++使用librdkafka库实现kafka的消费实例
程序员文章站
2022-03-24 13:41:18
关于librdkafka库的介绍,可以参考kafka的c/c++高性能客户端librdkafka简介,本文使用librdkafka库来进行kafka的简单的消费
librdkafka在的基础上封装了...
关于librdkafka库的介绍,可以参考kafka的c/c++高性能客户端librdkafka简介,本文使用librdkafka库来进行kafka的简单的消费
librdkafka在的基础上封装了一层c++的api,可以实现kafka的消费操作,基本操作步骤如下
1、创建kafka 配置
rdkafka::conf *conf = nullptr; conf = rdkafka::conf::create(rdkafka::conf::conf_global);
2、设置kafka各项参数
/*设置broker list*/ conf->set("bootstrap.servers", brokers_, errstr); /*设置consumer group*/ conf->set("group.id", groupid_, errstr); /*每次从单个分区中拉取消息的最大尺寸*/ conf->set("max.partition.fetch.bytes", strfetch_num, errstr);
3、创建kafka topic配置
rdkafka::conf *tconf = nullptr; tconf = rdkafka::conf::create(rdkafka::conf::conf_topic);
4、设置kafka topic参数
if(tconf->set("auto.offset.reset", "smallest", errstr)
5、创建kafka consumer实例
kafka_consumer_ = rdkafka::consumer::create(conf, errstr);
6、创建kafka topic
rdkafka::topic::create(kafka_consumer_, topics_, tconf, errstr);
7、启动kafka consumer实例
rdkafka::errorcode resp = kafka_consumer_->start(topic_, partition_, offset_);
8、消费kafka
kafka_consumer_->consume(topic_, partition_, timeout_ms);
9、阻塞等待消息
kafka_consumer_->poll(0);
10、停止消费
kafka_consumer_->stop(topic_, partition_);
11、销毁consumer实例
rdkafka::wait_destroyed(5000);
完整代码
my_consumer.h:
#include #include #include #include #include #include #include "../src-cpp/rdkafkacpp.h" class kafka_consumer_client{ public: kafka_consumer_client(const std::string& brokers, const std::string& topics, std::string groupid, int64_t offset); //kafka_consumer_client(); virtual ~kafka_consumer_client(); bool initclient(); bool consume(int timeout_ms); void finalize(); private: void consumer(rdkafka::message *msg, void *opt); std::string brokers_; std::string topics_; std::string groupid_; int64_t last_offset_ = 0; rdkafka::consumer *kafka_consumer_ = nullptr; rdkafka::topic *topic_ = nullptr; int64_t offset_ = rdkafka::topic::offset_beginning; int32_t partition_ = 0; };
my_consumer.cpp
#include "my_consumer_cpp.h" bool run_ = true; static void sigterm (int sig) { run_ = false; } kafka_consumer_client::kafka_consumer_client(const std::string& brokers, const std::string& topics, std::string groupid, int64_t offset) :brokers_(brokers), topics_(topics), groupid_(groupid), offset_(offset){ } //kafka_consumer_client::kafka_consumer_client(){} kafka_consumer_client::~kafka_consumer_client(){} bool kafka_consumer_client::initclient(){ rdkafka::conf *conf = nullptr; conf = rdkafka::conf::create(rdkafka::conf::conf_global); if(!conf){ fprintf(stderr, "rdkafka create global conf failed\n"); return false; } std::string errstr; /*设置broker list*/ if (conf->set("bootstrap.servers", brokers_, errstr) != rdkafka::conf::conf_ok){ fprintf(stderr, "rdkafka conf set brokerlist failed : %s\n", errstr.c_str()); } /*设置consumer group*/ if (conf->set("group.id", groupid_, errstr) != rdkafka::conf::conf_ok){ fprintf(stderr, "rdkafka conf set group.id failed : %s\n", errstr.c_str()); } std::string strfetch_num = "10240000"; /*每次从单个分区中拉取消息的最大尺寸*/ if(conf->set("max.partition.fetch.bytes", strfetch_num, errstr) != rdkafka::conf::conf_ok){ fprintf(stderr, "rdkafka conf set max.partition failed : %s\n", errstr.c_str()); } /*创建kafka consumer实例*/ kafka_consumer_ = rdkafka::consumer::create(conf, errstr); if(!kafka_consumer_){ fprintf(stderr, "failed to ceate consumer\n"); } delete conf; rdkafka::conf *tconf = nullptr; /*创建kafka topic的配置*/ tconf = rdkafka::conf::create(rdkafka::conf::conf_topic); if(!tconf){ fprintf(stderr, "rdkafka create topic conf failed\n"); return false; } /*kafka + zookeeper,当消息被消费时,会想zk提交当前groupid的consumer消费的offset信息, 当consumer再次启动将会从此offset开始继续消费.在consumter端配置文件中(或者是 consumerconfig类参数)有个"autooffset.reset"(在kafka 0.8版本中为auto.offset.reset), 有2个合法的值"largest"/"smallest",默认为"largest",此配置参数表示当此groupid下的消费者, 在zk中没有offset值时(比如新的groupid,或者是zk数据被清空),consumer应该从哪个offset开始 消费.largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即从topic的 开始位置消费所有消息.*/ if(tconf->set("auto.offset.reset", "smallest", errstr) != rdkafka::conf::conf_ok){ fprintf(stderr, "rdkafka conf set auto.offset.reset failed : %s\n", errstr.c_str()); } topic_ = rdkafka::topic::create(kafka_consumer_, topics_, tconf, errstr); if(!topic_){ fprintf(stderr, "rdkafka create topic failed : %s\n", errstr.c_str()); } delete tconf; rdkafka::errorcode resp = kafka_consumer_->start(topic_, partition_, offset_); if (resp != rdkafka::err_no_error){ fprintf(stderr, "failed to start consumer : %s\n", rdkafka::err2str(resp).c_str()); } return true; } void kafka_consumer_client::consumer(rdkafka::message *message, void *opt){ switch(message->err()){ case rdkafka::err__timed_out: break; case rdkafka::err_no_error: printf("%.*s\n", static_cast(message->len()), static_cast(message->payload())); last_offset_ = message->offset(); break; case rdkafka::err__partition_eof: std::cerr << "%% reached the end of the queue, offset: " << last_offset_ << std::endl; break; case rdkafka::err__unknown_topic: case rdkafka::err__unknown_partition: std::cerr << "consume failed: " << message->errstr() << std::endl; run_ = false; break; default: std::cerr << "consume failed: " << message->errstr() << std::endl; run_ = false; break; } } bool kafka_consumer_client::consume(int timeout_ms){ rdkafka::message *msg = nullptr; while(run_){ msg = kafka_consumer_->consume(topic_, partition_, timeout_ms); consumer(msg, nullptr); kafka_consumer_->poll(0); delete msg; } kafka_consumer_->stop(topic_, partition_); if(topic_){ delete topic_; topic_ = nullptr; } if(kafka_consumer_){ delete kafka_consumer_; kafka_consumer_ = nullptr; } /*销毁kafka实例*/ rdkafka::wait_destroyed(5000); return true; } int main(int argc, char **argv){ int opt; //std::vector topics; std::string topics; std::string brokers = "localhost:9092"; std::string group = "1"; while ((opt = getopt(argc, argv, "g:b:t:qd:ex:as:do")) != -1){ switch (opt) { case 'b': brokers = optarg; break; case 'g': group = optarg; break; case 't': topics = optarg; break; default: break; } } /*for (; optind < argc ; optind++) topics.push_back(std::string(argv[optind]));*/ signal(sigint, sigterm); signal(sigterm, sigterm); std::shared_ptr kafka_consumer_client_ = std::make_shared(brokers, topics, group, 0); //std::shared_ptr kafka_consumer_client_ = std::make_shared(); if (!kafka_consumer_client_->initclient()){ fprintf(stderr, "kafka server initialize error\n"); }else{ printf("start kafka consumer\n"); kafka_consumer_client_->consume(1000); } fprintf(stderr, "kafka consume exit! \n"); return 0; }
编译:
g++ my_consumer.cpp -o my_consumer_cpp -std=c++11 -lrdkafka++ -lz -lpthread -lrt
在运行my_producer或my_consumer时可能会报错"error while loading shared librariesxxx.so", 此时需要在/etc/ld.so.conf中加入xxx.so所在的目录
在本地启动一个简单的kafka服务,设置broker集群为localhost:9092并创建一个叫“test”的topic
启动consumer:
开启kafka 自带的producer,并发送消息“hello world”
consumer处收到的消息:
推荐阅读
-
c++使用librdkafka库实现kafka的消费实例
-
c语言使用librdkafka库实现kafka生产和消费的实例
-
使用PHP连接数据库实现留言板功能的实例讲解(推荐)
-
使用php的HTTP请求的库Requests实现美女图片墙_php实例
-
使用php的HTTP请求的库Requests实现美女图片墙_php实例
-
【原创】利用librdkafka实现的可延迟回写offset的消费者实例
-
Java语言mysql数据库的访问步骤,一个简单的实例——使用DAO(数据库操作类 Data Access Object ) 实现对mysql数据库的增删改查
-
PHP使用Mysqli类库实现分页效果的方法及实例分析
-
PHP使用Mysqli类库实现分页效果的方法及实例分析
-
c++使用librdkafka库实现kafka的消费实例