【原创】利用librdkafka实现的可延迟回写offset的消费者实例
程序员文章站
2022-05-14 21:55:00
...
在Kafka 0.8.2 或以后版本中,支持将offset提交给broker,由broker管理偏移量。在librdkafka中也支持配置偏移量存储方式:’file’ - 本地文件存储 (offset.store.path, et.al), ‘broker’ - 在 broker 上提交存储。
但是在实际应用场景中,往往会遇到这样一种情景:消费者消费了消息,然后进行处理,当处理失败或者消费者异常退出时,希望还能够重试失败的消息。因此需要先消费-处理,成功后再进行偏移量的提交。尤其是在批量抓取这种场景下,如果一批数据的偏移量已经提交了,之后才发现处理失败,就会导致丢失大部分数据。
于是在研究了很久librdkafka后,写出的可延迟回写offset的消费者实例:
hpp文件kafkaConsumer.hpp:
#ifndef KAFKA_CONSUMER_HPP
#define KAFKA_CONSUMER_HPP
#include <string>
#include "rdkafka/src-cpp/rdkafkacpp.h"
class Kafka_Consumer
{
public:
Kafka_Consumer(std::string groupid, std::string broker_list, std::vector<std::string> topics)
{
std::string errstr;
kafka_conf = RdKafka::Conf::create(RdKafka::Conf::CONF_GLOBAL);
kafka_tconf = RdKafka::Conf::create(RdKafka::Conf::CONF_TOPIC);
if (kafka_conf->set("enable.partition.eof", "false", errstr) != RdKafka::Conf::CONF_OK) {
fprintf(stderr, "Failed to set enable.partition.eof: %s\n",errstr.c_str());
exit(-1);
}
//关闭自动提交offset,改提交offset为手动提交
if (kafka_conf->set("enable.auto.offset.store", "false", errstr) != RdKafka::Conf::CONF_OK) {
fprintf(stderr, "Failed to set enable.auto.offset.store: %s\n",errstr.c_str());
exit(-1);
}
//偏移量参数设为earliest,即偏移量存储还没有初始化或偏移量超过范围时的处理方式:自动重设偏移量为最小偏移量。
//即当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
if (kafka_conf->set("auto.offset.reset", "earliest", errstr) != RdKafka::Conf::CONF_OK) {
fprintf(stderr, "Failed to set auto.offset.reset: %s\n",errstr.c_str());
exit(-1);
}
kafka_conf->set("group.id", groupid, errstr);
kafka_conf->set("metadata.broker.list", broker_list, errstr);
kafka_conf->set("default_topic_conf", kafka_tconf, errstr);
consumer = RdKafka::KafkaConsumer::create(kafka_conf, errstr);
if (!consumer) {
fprintf(stderr, "Failed to create consumer: %s\n", errstr.c_str());
exit(-1);
}
fprintf(stderr, "Created consumer:%s \n", consumer->name().c_str());
RdKafka::ErrorCode err = consumer->subscribe(topics);
if (err) {
fprintf(stderr,"Failed to subscribe to [%d] topics: %s\n", topics.size(),RdKafka::err2str(err).c_str());
exit(-1);
}
}
~Kafka_Consumer(){
RdKafka::TopicPartition::destroy(partitions);
if(consumer)
{
consumer->close();
delete consumer;
}
delete kafka_conf;
delete kafka_tconf;
RdKafka::wait_destroyed(5000);
}
int msg_consume(std::string &data);
int consume_batch(std::vector<std::string> &data, size_t batch_size, int batch_tmout);
int store_the_offset();
private:
RdKafka::Conf *kafka_conf;
RdKafka::Conf *kafka_tconf;
RdKafka::KafkaConsumer *consumer;
std::vector<RdKafka::TopicPartition*> partitions;
};
#endif
CPP文件kafkaConsumer.cpp:
#include "Utils.hpp"
#include "kafkaConsumer.hpp"
#include "rdkafka/src-cpp/rdkafkacpp.h"
int Kafka_Consumer::msg_consume(std::string &data){
RdKafka::Message *message = consumer->consume(1000);
int ret = -1;
data.clear();
data="";
switch (message->err()) {
case RdKafka::ERR__TIMED_OUT:
break;
case RdKafka::ERR_NO_ERROR:
/* Real message */
data.assign(static_cast<const char *>(message->payload()), static_cast<int>(message->len()));
ret = 0;
break;
case RdKafka::ERR__PARTITION_EOF:
/* Last message */
break;
case RdKafka::ERR__UNKNOWN_TOPIC:
case RdKafka::ERR__UNKNOWN_PARTITION:
fprintf(stderr, "Consume failed: %s\n", message->errstr().c_str());
break;
default:
/* Errors */
fprintf(stderr, "Consume failed: %s\n", message->errstr().c_str());
}
delete message;
return ret;
}
int Kafka_Consumer::consume_batch(std::vector<std::string> &data, size_t batch_size, int batch_tmout)
{
data.clear();
data.reserve(batch_size);
std::string tmp;
int64_t end = Utils::getnowtime_ms() + batch_tmout;
int remaining_timeout = batch_tmout;
while (data.size() < batch_size) {
RdKafka::Message *msg = consumer->consume(remaining_timeout);
switch (msg->err()) {
case RdKafka::ERR__TIMED_OUT:
delete msg;
return -1;
case RdKafka::ERR_NO_ERROR:
tmp.clear();
tmp.assign(static_cast<const char *>(msg->payload()), static_cast<int>(msg->len()));
data.push_back(tmp);
delete msg;
break;
default:
fprintf(stderr, "Consumer error: %s\n", msg->errstr().c_str());
delete msg;
return -1;
}
remaining_timeout = end - Utils::getnowtime_ms();
if (remaining_timeout < 0)
break;
}
fprintf(stderr, "Consumer success\n");
return 0;
}
int Kafka_Consumer::store_the_offset()
{
//当消费过数据后,就可以通过assignment函数获得当前的TopicPartition数组(还没有完成过消费就使用assignment的话,数组会为空)
//虽然官方说,只要第一次assignment后就可以用assign函数更新数组,但是实测并不是这样,所以用先销毁,后assignment的方式更新数组
RdKafka::TopicPartition::destroy(partitions);
consumer->assignment(partitions);
//使用position函数获得最新的offset信息并更新给TopicPartition数组
consumer->position(partitions);
//使用异步提交函数将TopicPartition数组提交给broker,即达到了手动回写offset的目的。
RdKafka::ErrorCode err = consumer->commitAsync(partitions);
if (err)
{
fprintf(stderr, "Kafka_Consumer::store_the_offset fail\n");
return -1;
}
fprintf(stderr, "Kafka_Consumer::store_the_offset success:%d\n",partitions.size());
return 0;
}
在代码中,延迟回写offset这一功能的实现,主要就是利用了librdkafka中的一个特殊的结构:
std::vector<RdKafka::TopicPartition*> partitions;
利用position函数获得最新的offset信息并更新给TopicPartition数组,再异步提交给kafka集群即可。commitAsync还有相对应的同步提交的函数,具体可以见librdkafka源码。
在实际使用中,先消费数据,再进行额外的处理操作,最后调用store_the_offset()即可。