欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

记一次线上kafka consumer不消费造成积压排查录

程序员文章站 2024-03-19 11:29:52
...

问题:

BU的同学在使用kafka的过程中,发现自己的数据日志不符合预期,按照正常的处理流程应该会出现预期中的数据,由于代码没有变动,之前一直在线上运行,如下:

try {
    Properties props = new Properties();
    props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Constants.KAFKA_CLUSTER);
    props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);
    props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
    props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
            "org.apache.kafka.common.serialization.StringDeserializer");
    props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");


    log.info("start kafka: {}", props);
    consumer = new KafkaConsumer<>(props);
    consumer.subscribe(Collections.singletonList(KafkaUtils.getTopic(TOPIC)));
    ExecutorService executorService = getExecutorService();
    new Thread(() -> {
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (ConsumerRecord<String, String> record : records) {
                executorService.submit(() -> {

                    handle(record);

                });
            }
        }
    }, "status-frame-base").start();
} catch (Exception e) {
    log.error("ctr compute error", e);
}

分析:

首先想到的就是kafka是不是除了什么问题,通过matrix的监控图没有看到有什么异常

然后又查看了km上的topic相关监控信息,也没有相关的异常

所有的consumer监控指标正常,这下就懵逼中。。。

接下来,难道是consumer的poll、offset提交阻塞?

最后想consumer是不是阻塞了,然后赶紧查看系统的线程栈信息

发现没有blocked的线程

然后查看kafka的相关线程

在查找“status-frame-base”这个线程信息的时候(这个时候就知道为什么要让你们在线上起线程名字的时候为啥要规范了),竟然没有找到????

记一次线上kafka consumer不消费造成积压排查录记一次线上kafka consumer不消费造成积压排查录记一次线上kafka consumer不消费造成积压排查录

对于如此诡异的现象能做的就是查看warn和error日志 

无奈没有相关的信息,再加上项目的权限等相关的问题,我拿不到相关数据,因此只能让BU的同学重启试一下了

结果:

果然重启能够解决90%的问题

回到上面的排查过程,其实通过km的监控去看consumer的信息是乏力的,因为没有相关的server到consumer心跳监控信息。我们索能看到的都是写静态的监控信息,这就是为啥在server端的监控啥也看不到

后面将kafka的相关的线程通过jmx监控在matrix里,后续排查就方便多了

记一次线上kafka consumer不消费造成积压排查录

后续继续跟进。。。

对了上面的代码还有一个要注意的地方,就是有些同学喜欢在使用try-catch的时候不自觉的遵守java编码规范的中的精简之道,估计是没有深入的理解这个try-catch-resource的作用

这个表达式作用在所有实现Closeable接口的资源上

错误代码示例:

        try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            consumer.subscribe(Collections.singletonList(TOPIC));

                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(100);
                    for (ConsumerRecord<String, String> record : records) {

                        System.out.println(record);
                    }
                }

        }
    } catch 。。。

下期分享一下线上的kafka consumer和producer的监控

相关标签: kafka