springboot中实现kafa指定offset消费
程序员文章站
2022-06-11 16:54:14
kafka消费过程难免会遇到需要重新消费的场景,例如我们消费到kafka数据之后需要进行存库操作,若某一时刻数据库down了,导致kafka消费的数据无法入库,为了弥补数据库down期间的数据损失,有一种做法我们可以指定kafka消费者的offset到之前某一时间的数值,然后重新进行消费。 ......
kafka消费过程难免会遇到需要重新消费的场景,例如我们消费到kafka数据之后需要进行存库操作,若某一时刻数据库down了,导致kafka消费的数据无法入库,为了弥补数据库down期间的数据损失,有一种做法我们可以指定kafka消费者的offset到之前某一时间的数值,然后重新进行消费。
首先创建kafka消费服务
@service @slf4j //实现commandlinerunner接口,在springboot启动时自动运行其run方法。 public class tsplogbookanalysisservice implements commandlinerunner { @override public void run(string... args) { //do something } }
kafka消费模型建立
kafka server中每个主题存在多个分区(partition),每个分区自己维护一个偏移量(offset),我们的目标是实现kafka consumer指定offset消费。
在这里使用consumer-->partition一对一的消费模型,每个consumer各自管理自己的partition。
@service @slf4j public class tsplogbookanalysisservice implements commandlinerunner { //声明kafka分区数相等的消费线程数,一个分区对应一个消费线程 private static final int consumethreadnum = 9; //特殊指定每个分区开始消费的offset private list<long> partitionoffsets = lists.newarraylist(1111,1112,1113,1114,1115,1116,1117,1118,1119); private executorservice executorservice = executors.newfixedthreadpool(consumethreadnum); @override public void run(string... args) { //循环遍历创建消费线程 intstream.range(0, consumethreadnum) .foreach(partitionindex -> executorservice.submit(() -> startconsume(partitionindex))); } }
kafka consumer对offset的处理
声明kafka consumer的配置类
private properties buildkafkaconfig() { properties kafkaconfiguration = new properties(); kafkaconfiguration.put(consumerconfig.bootstrap_servers_config, ""); kafkaconfiguration.put(consumerconfig.group_id_config, ""); kafkaconfiguration.put(consumerconfig.max_poll_records_config, ""); kafkaconfiguration.put(consumerconfig.auto_commit_interval_ms_config, ""); kafkaconfiguration.put(consumerconfig.key_deserializer_class_config, ""); kafkaconfiguration.put(consumerconfig.value_deserializer_class_config, ""); kafkaconfiguration.put(consumerconfig.auto_offset_reset_config,""); kafkaconfiguration.put(consumerconfig.enable_auto_commit_config, ""); ...更多配置项 return kafkaconfiguration; }
创建kafka consumer,处理offset,开始消费数据任务
private void startconsume(int partitionindex) { //创建kafka consumer kafkaconsumer<string, byte[]> consumer = new kafkaconsumer<>(buildkafkaconfig()); try { //指定该consumer对应的消费分区 topicpartition partition = new topicpartition(kafkaproperties.getkafkatopic(), partitionindex); consumer.assign(lists.newarraylist(partition)); //consumer的offset处理 if (collectionutils.isnotempty(partitionoffsets) && partitionoffsets.size() == consumethreadnum) { long seekoffset = partitionoffsets.get(partitionindex); log.info("partition:{} , offset seek from {}", partition, seekoffset); consumer.seek(partition, seekoffset); } //开始消费数据任务 kafkarecordconsume(consumer, partition); } catch (exception e) { log.error("kafka consume error:{}", exceptionutils.getfullstacktrace(e)); } finally { try { consumer.commitsync(); } finally { consumer.close(); } } }
消费数据逻辑,offset操作
private void kafkarecordconsume(kafkaconsumer<string, byte[]> consumer, topicpartition partition) { while (true) { try { consumerrecords<string, byte[]> records = consumer.poll(tsplogbookconstants.poll_timeout); //具体的处理流程 records.foreach((k) -> handlekafkainput(k.key(), k.value())); //