欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

springboot中实现kafa指定offset消费

程序员文章站 2022-06-11 16:54:14
kafka消费过程难免会遇到需要重新消费的场景,例如我们消费到kafka数据之后需要进行存库操作,若某一时刻数据库down了,导致kafka消费的数据无法入库,为了弥补数据库down期间的数据损失,有一种做法我们可以指定kafka消费者的offset到之前某一时间的数值,然后重新进行消费。 ......

kafka消费过程难免会遇到需要重新消费的场景,例如我们消费到kafka数据之后需要进行存库操作,若某一时刻数据库down了,导致kafka消费的数据无法入库,为了弥补数据库down期间的数据损失,有一种做法我们可以指定kafka消费者的offset到之前某一时间的数值,然后重新进行消费。

首先创建kafka消费服务

@service
@slf4j
//实现commandlinerunner接口,在springboot启动时自动运行其run方法。
public class tsplogbookanalysisservice implements commandlinerunner {
    @override
    public void run(string... args) {
        //do something
    }
}

kafka消费模型建立

kafka server中每个主题存在多个分区(partition),每个分区自己维护一个偏移量(offset),我们的目标是实现kafka consumer指定offset消费。

在这里使用consumer-->partition一对一的消费模型,每个consumer各自管理自己的partition。

springboot中实现kafa指定offset消费

@service
@slf4j
public class tsplogbookanalysisservice implements commandlinerunner {
    //声明kafka分区数相等的消费线程数,一个分区对应一个消费线程
    private  static final int consumethreadnum = 9;
    //特殊指定每个分区开始消费的offset
    private list<long> partitionoffsets = lists.newarraylist(1111,1112,1113,1114,1115,1116,1117,1118,1119);
   
    private executorservice executorservice = executors.newfixedthreadpool(consumethreadnum);

    @override
    public void run(string... args) {
        //循环遍历创建消费线程
        intstream.range(0, consumethreadnum)
                .foreach(partitionindex -> executorservice.submit(() -> startconsume(partitionindex)));
    }
}

kafka consumer对offset的处理

声明kafka consumer的配置类

private properties buildkafkaconfig() {
    properties kafkaconfiguration = new properties();
    kafkaconfiguration.put(consumerconfig.bootstrap_servers_config, "");
    kafkaconfiguration.put(consumerconfig.group_id_config, "");
    kafkaconfiguration.put(consumerconfig.max_poll_records_config, "");
    kafkaconfiguration.put(consumerconfig.auto_commit_interval_ms_config, "");
    kafkaconfiguration.put(consumerconfig.key_deserializer_class_config, "");
    kafkaconfiguration.put(consumerconfig.value_deserializer_class_config, "");
    kafkaconfiguration.put(consumerconfig.auto_offset_reset_config,"");
    kafkaconfiguration.put(consumerconfig.enable_auto_commit_config, "");
    ...更多配置项

    return kafkaconfiguration;
}

创建kafka consumer,处理offset,开始消费数据任务

private void startconsume(int partitionindex) {
    //创建kafka consumer
    kafkaconsumer<string, byte[]> consumer = new kafkaconsumer<>(buildkafkaconfig());

    try {
        //指定该consumer对应的消费分区
        topicpartition partition = new topicpartition(kafkaproperties.getkafkatopic(), partitionindex);
        consumer.assign(lists.newarraylist(partition));

        //consumer的offset处理
        if (collectionutils.isnotempty(partitionoffsets)  &&  partitionoffsets.size() == consumethreadnum) {
            long seekoffset = partitionoffsets.get(partitionindex);
            log.info("partition:{} , offset seek from {}", partition, seekoffset);
            consumer.seek(partition, seekoffset);
        }
        
        //开始消费数据任务
        kafkarecordconsume(consumer, partition);
    } catch (exception e) {
        log.error("kafka consume error:{}", exceptionutils.getfullstacktrace(e));
    } finally {
        try {
            consumer.commitsync();
        } finally {
            consumer.close();
        }
    }
}

消费数据逻辑,offset操作

private void kafkarecordconsume(kafkaconsumer<string, byte[]> consumer, topicpartition partition) {
    while (true) {
        try {
            consumerrecords<string, byte[]> records = consumer.poll(tsplogbookconstants.poll_timeout);
            //具体的处理流程
            records.foreach((k) -> handlekafkainput(k.key(), k.value()));

            //