欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

c++使用librdkafka库实现kafka的消费实例

程序员文章站 2022-03-24 13:41:18
关于librdkafka库的介绍,可以参考kafka的c/c++高性能客户端librdkafka简介,本文使用librdkafka库来进行kafka的简单的消费 librdkafka在的基础上封装了...

关于librdkafka库的介绍,可以参考kafka的c/c++高性能客户端librdkafka简介,本文使用librdkafka库来进行kafka的简单的消费

librdkafka在的基础上封装了一层c++的api,可以实现kafka的消费操作,基本操作步骤如下

1、创建kafka 配置

rdkafka::conf *conf = nullptr;
conf = rdkafka::conf::create(rdkafka::conf::conf_global);

2、设置kafka各项参数

/*设置broker list*/
conf->set("bootstrap.servers", brokers_, errstr); 

/*设置consumer group*/
conf->set("group.id", groupid_, errstr);

/*每次从单个分区中拉取消息的最大尺寸*/
conf->set("max.partition.fetch.bytes", strfetch_num, errstr);

3、创建kafka topic配置

rdkafka::conf *tconf = nullptr;
tconf = rdkafka::conf::create(rdkafka::conf::conf_topic);

4、设置kafka topic参数

if(tconf->set("auto.offset.reset", "smallest", errstr)

5、创建kafka consumer实例

kafka_consumer_ = rdkafka::consumer::create(conf, errstr);

6、创建kafka topic

rdkafka::topic::create(kafka_consumer_, topics_, tconf, errstr);

7、启动kafka consumer实例

rdkafka::errorcode resp = kafka_consumer_->start(topic_, partition_, offset_);

8、消费kafka

kafka_consumer_->consume(topic_, partition_, timeout_ms);

9、阻塞等待消息

kafka_consumer_->poll(0);

10、停止消费

kafka_consumer_->stop(topic_, partition_);

11、销毁consumer实例

rdkafka::wait_destroyed(5000);


完整代码

my_consumer.h:

#include 
#include 
#include 
#include 
#include 
#include 
#include "../src-cpp/rdkafkacpp.h"

class kafka_consumer_client{
public:
	kafka_consumer_client(const std::string& brokers, const std::string& topics, std::string groupid, int64_t offset);
	//kafka_consumer_client();
	virtual ~kafka_consumer_client();

	bool initclient();
	bool consume(int timeout_ms);
	void finalize();
private:
	void consumer(rdkafka::message *msg, void *opt);

	std::string brokers_;
	std::string topics_;
	std::string groupid_;

	int64_t last_offset_ = 0;
	rdkafka::consumer *kafka_consumer_ = nullptr;	
	rdkafka::topic    *topic_ 		   = nullptr;
	int64_t 		  offset_ 		   = rdkafka::topic::offset_beginning;
	int32_t		      partition_       = 0;
	
};

my_consumer.cpp
#include "my_consumer_cpp.h"


bool run_ = true;

static void sigterm (int sig) {
  run_ = false;
}

kafka_consumer_client::kafka_consumer_client(const std::string& brokers, const std::string& topics, std::string groupid, int64_t offset)
:brokers_(brokers),
 topics_(topics),
 groupid_(groupid),
 offset_(offset){
 }

//kafka_consumer_client::kafka_consumer_client(){}

kafka_consumer_client::~kafka_consumer_client(){}

bool kafka_consumer_client::initclient(){
	rdkafka::conf *conf = nullptr;
	conf = rdkafka::conf::create(rdkafka::conf::conf_global);
	if(!conf){
		fprintf(stderr, "rdkafka create global conf failed\n");
		return false;
	}

	std::string errstr;
	/*设置broker list*/
	if (conf->set("bootstrap.servers", brokers_, errstr) != rdkafka::conf::conf_ok){
		fprintf(stderr, "rdkafka conf set brokerlist failed : %s\n", errstr.c_str());
	}

	/*设置consumer group*/
	if (conf->set("group.id", groupid_, errstr) != rdkafka::conf::conf_ok){
		fprintf(stderr, "rdkafka conf set group.id failed : %s\n", errstr.c_str());
	}

	std::string strfetch_num = "10240000";
	/*每次从单个分区中拉取消息的最大尺寸*/
	if(conf->set("max.partition.fetch.bytes", strfetch_num, errstr) != rdkafka::conf::conf_ok){
		fprintf(stderr, "rdkafka conf set max.partition failed : %s\n", errstr.c_str());
	}

	/*创建kafka consumer实例*/
	kafka_consumer_ = rdkafka::consumer::create(conf, errstr);
	if(!kafka_consumer_){
		fprintf(stderr, "failed to ceate consumer\n");
	}
	delete conf;

	rdkafka::conf *tconf = nullptr;
	/*创建kafka topic的配置*/
	tconf = rdkafka::conf::create(rdkafka::conf::conf_topic);
	if(!tconf){
		fprintf(stderr, "rdkafka create topic conf failed\n");
		return false;
	}

	/*kafka + zookeeper,当消息被消费时,会想zk提交当前groupid的consumer消费的offset信息,
	当consumer再次启动将会从此offset开始继续消费.在consumter端配置文件中(或者是
	consumerconfig类参数)有个"autooffset.reset"(在kafka 0.8版本中为auto.offset.reset),
	有2个合法的值"largest"/"smallest",默认为"largest",此配置参数表示当此groupid下的消费者,
	在zk中没有offset值时(比如新的groupid,或者是zk数据被清空),consumer应该从哪个offset开始
	消费.largest表示接受接收最大的offset(即最新消息),smallest表示最小offset,即从topic的
	开始位置消费所有消息.*/
	if(tconf->set("auto.offset.reset", "smallest", errstr) != rdkafka::conf::conf_ok){
		fprintf(stderr, "rdkafka conf set auto.offset.reset failed : %s\n", errstr.c_str());
	}

	topic_ = rdkafka::topic::create(kafka_consumer_, topics_, tconf, errstr);
	if(!topic_){
		fprintf(stderr, "rdkafka create topic failed : %s\n", errstr.c_str());
	}
	delete tconf;

	rdkafka::errorcode resp = kafka_consumer_->start(topic_, partition_, offset_);
	if (resp != rdkafka::err_no_error){
		fprintf(stderr, "failed to start consumer : %s\n", rdkafka::err2str(resp).c_str());
	}

	return true;
}

void kafka_consumer_client::consumer(rdkafka::message *message, void *opt){
	switch(message->err()){
		case rdkafka::err__timed_out:
			break;
		case rdkafka::err_no_error:
			printf("%.*s\n", 
				static_cast(message->len()),
				static_cast(message->payload()));
			last_offset_ = message->offset();
			break;
		case rdkafka::err__partition_eof:
			std::cerr << "%% reached the end of the queue, offset: " << last_offset_ << std::endl;
			break;
		case rdkafka::err__unknown_topic:
		case rdkafka::err__unknown_partition:
			std::cerr << "consume failed: " << message->errstr() << std::endl;
			run_ = false;
			break;
		default:
			std::cerr << "consume failed: " << message->errstr() << std::endl;
			run_ = false;
			break;
	}
}

bool kafka_consumer_client::consume(int timeout_ms){
	rdkafka::message *msg = nullptr;

	while(run_){
		msg = kafka_consumer_->consume(topic_, partition_, timeout_ms);
		consumer(msg, nullptr);
		kafka_consumer_->poll(0);
		delete msg;
	}

	kafka_consumer_->stop(topic_, partition_);
	if(topic_){
		delete topic_;
		topic_ = nullptr;
	}
	if(kafka_consumer_){
		delete kafka_consumer_;
		kafka_consumer_ = nullptr;
	}

	/*销毁kafka实例*/
	rdkafka::wait_destroyed(5000);
	return true;
}

int main(int argc, char **argv){
	int opt;
	//std::vector topics;
	std::string topics;
	std::string brokers = "localhost:9092";
	std::string group = "1";

	while ((opt = getopt(argc, argv, "g:b:t:qd:ex:as:do")) != -1){
	    switch (opt) {
		    case 'b':
		      brokers = optarg;
		      break;
		    case 'g':
		      group = optarg;
		      break;
		    case 't':
		      topics = optarg;
		      break;
		    default:
		      break;
	  	}
  	}

  	/*for (; optind < argc ; optind++)
    topics.push_back(std::string(argv[optind]));*/

	signal(sigint, sigterm);
  	signal(sigterm, sigterm);

  	std::shared_ptr kafka_consumer_client_ = std::make_shared(brokers, topics, group, 0);
  	//std::shared_ptr kafka_consumer_client_ = std::make_shared();
  	if (!kafka_consumer_client_->initclient()){
  		fprintf(stderr, "kafka server initialize error\n");
  	}else{
  		printf("start kafka consumer\n");
  		kafka_consumer_client_->consume(1000);
  	}
  	
  	fprintf(stderr, "kafka consume exit! \n");

  	return 0;
}

编译:
g++ my_consumer.cpp -o my_consumer_cpp -std=c++11  -lrdkafka++ -lz -lpthread -lrt

在运行my_producer或my_consumer时可能会报错"error while loading shared librariesxxx.so", 此时需要在/etc/ld.so.conf中加入xxx.so所在的目录


在本地启动一个简单的kafka服务,设置broker集群为localhost:9092并创建一个叫“test”的topic

启动consumer:

c++使用librdkafka库实现kafka的消费实例

开启kafka 自带的producer,并发送消息“hello world”

c++使用librdkafka库实现kafka的消费实例

consumer处收到的消息:

c++使用librdkafka库实现kafka的消费实例