欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

【原创】利用librdkafka实现的可延迟回写offset的消费者实例

程序员文章站 2022-05-14 21:55:00
...

在Kafka 0.8.2 或以后版本中,支持将offset提交给broker,由broker管理偏移量。在librdkafka中也支持配置偏移量存储方式:’file’ - 本地文件存储 (offset.store.path, et.al), ‘broker’ - 在 broker 上提交存储。

但是在实际应用场景中,往往会遇到这样一种情景:消费者消费了消息,然后进行处理,当处理失败或者消费者异常退出时,希望还能够重试失败的消息。因此需要先消费-处理,成功后再进行偏移量的提交。尤其是在批量抓取这种场景下,如果一批数据的偏移量已经提交了,之后才发现处理失败,就会导致丢失大部分数据。

于是在研究了很久librdkafka后,写出的可延迟回写offset的消费者实例:

hpp文件kafkaConsumer.hpp:

#ifndef KAFKA_CONSUMER_HPP
#define KAFKA_CONSUMER_HPP

#include <string>
#include "rdkafka/src-cpp/rdkafkacpp.h"

class Kafka_Consumer
{
	public:
		Kafka_Consumer(std::string groupid, std::string broker_list, std::vector<std::string> topics)
		{
			std::string errstr;
			kafka_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
			kafka_tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
			
			if (kafka_conf->set("enable.partition.eof", "false", errstr) != RdKafka::Conf::CONF_OK) {
				fprintf(stderr, "Failed to set enable.partition.eof: %s\n",errstr.c_str());
				exit(-1);
			}
  
			//关闭自动提交offset,改提交offset为手动提交
			if (kafka_conf->set("enable.auto.offset.store", "false", errstr) != RdKafka::Conf::CONF_OK) {
				fprintf(stderr, "Failed to set enable.auto.offset.store: %s\n",errstr.c_str());
				exit(-1);
			}
			
			//偏移量参数设为earliest,即偏移量存储还没有初始化或偏移量超过范围时的处理方式:自动重设偏移量为最小偏移量。
			//即当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费 
			if (kafka_conf->set("auto.offset.reset", "earliest", errstr) != RdKafka::Conf::CONF_OK) {
				fprintf(stderr, "Failed to set auto.offset.reset: %s\n",errstr.c_str());
				exit(-1);
			}
  
			kafka_conf->set("group.id", groupid, errstr);
			kafka_conf->set("metadata.broker.list", broker_list, errstr);
			kafka_conf->set("default_topic_conf", kafka_tconf, errstr);
			
			consumer = RdKafka::KafkaConsumer::create(kafka_conf, errstr);
			if (!consumer) {
				fprintf(stderr, "Failed to create consumer: %s\n", errstr.c_str());
				exit(-1);
			}
			
			fprintf(stderr, "Created consumer:%s \n", consumer->name().c_str());
			
			RdKafka::ErrorCode err = consumer->subscribe(topics);
			if (err) {
				fprintf(stderr,"Failed to subscribe to [%d] topics: %s\n", topics.size(),RdKafka::err2str(err).c_str());
				exit(-1);
			}
		}
		~Kafka_Consumer(){
			RdKafka::TopicPartition::destroy(partitions);
			if(consumer)
			{
			    consumer->close();
				delete consumer;
			}
			delete kafka_conf;
			delete kafka_tconf;
			RdKafka::wait_destroyed(5000);
			
		}
		int msg_consume(std::string &data);
		int consume_batch(std::vector<std::string> &data, size_t batch_size, int batch_tmout); 
		int store_the_offset();
	private:
		RdKafka::Conf *kafka_conf;
		RdKafka::Conf *kafka_tconf;
		RdKafka::KafkaConsumer *consumer;
		std::vector<RdKafka::TopicPartition*> partitions;
};
#endif

CPP文件kafkaConsumer.cpp:

#include "Utils.hpp"
#include "kafkaConsumer.hpp"
#include "rdkafka/src-cpp/rdkafkacpp.h"


int Kafka_Consumer::msg_consume(std::string &data){
	RdKafka::Message *message = consumer->consume(1000);
    int ret = -1;
	data.clear();
	data="";
	switch (message->err()) {
		case RdKafka::ERR__TIMED_OUT:
			break;

		case RdKafka::ERR_NO_ERROR:
			/* Real message */
			data.assign(static_cast<const char *>(message->payload()), static_cast<int>(message->len()));
			ret = 0;
			break;

		case RdKafka::ERR__PARTITION_EOF:
			/* Last message */
			break;

		case RdKafka::ERR__UNKNOWN_TOPIC:
		case RdKafka::ERR__UNKNOWN_PARTITION:
			fprintf(stderr, "Consume failed: %s\n", message->errstr().c_str());
			break;

		default:
			/* Errors */
			fprintf(stderr, "Consume failed: %s\n", message->errstr().c_str());
	}
	delete message;
	return ret;
}

int Kafka_Consumer::consume_batch(std::vector<std::string> &data, size_t batch_size, int batch_tmout)
{
	data.clear();
	data.reserve(batch_size);

	std::string tmp;
	int64_t end = Utils::getnowtime_ms() + batch_tmout;
	int remaining_timeout = batch_tmout;
	
	while (data.size() < batch_size) {
		RdKafka::Message *msg = consumer->consume(remaining_timeout);
		switch (msg->err()) {
			case RdKafka::ERR__TIMED_OUT:
				delete msg;
				return -1;

			case RdKafka::ERR_NO_ERROR:
				tmp.clear();
				tmp.assign(static_cast<const char *>(msg->payload()), static_cast<int>(msg->len()));
				data.push_back(tmp);
				delete msg;
				break;

			default:
				fprintf(stderr, "Consumer error: %s\n", msg->errstr().c_str());
				delete msg;
				return -1;
		}
		remaining_timeout = end - Utils::getnowtime_ms();
		if (remaining_timeout < 0)
			break;	
	}
	
	fprintf(stderr, "Consumer success\n");
	return 0;
}

int Kafka_Consumer::store_the_offset()
{
	//当消费过数据后,就可以通过assignment函数获得当前的TopicPartition数组(还没有完成过消费就使用assignment的话,数组会为空)
	//虽然官方说,只要第一次assignment后就可以用assign函数更新数组,但是实测并不是这样,所以用先销毁,后assignment的方式更新数组
	RdKafka::TopicPartition::destroy(partitions);
	consumer->assignment(partitions);
	
	//使用position函数获得最新的offset信息并更新给TopicPartition数组
	consumer->position(partitions);
	
	//使用异步提交函数将TopicPartition数组提交给broker,即达到了手动回写offset的目的。
	RdKafka::ErrorCode err = consumer->commitAsync(partitions);
	if (err)
	{
		fprintf(stderr, "Kafka_Consumer::store_the_offset fail\n");
		return -1;
	}

	fprintf(stderr, "Kafka_Consumer::store_the_offset success:%d\n",partitions.size());
	return 0;
}

在代码中,延迟回写offset这一功能的实现,主要就是利用了librdkafka中的一个特殊的结构:

std::vector<RdKafka::TopicPartition*> partitions;

利用position函数获得最新的offset信息并更新给TopicPartition数组,再异步提交给kafka集群即可。commitAsync还有相对应的同步提交的函数,具体可以见librdkafka源码。

在实际使用中,先消费数据,再进行额外的处理操作,最后调用store_the_offset()即可。

相关标签: 原创