解决kafka消息堆积及分区不均匀的问题
kafka消息堆积及分区不均匀的解决
我在环境中发现代码里面的kafka有所延迟,查看kafka消息发现堆积严重,经过检查发现是kafka消息分区不均匀造成的,消费速度过慢。这里由自己在虚拟机上演示相关问题,给大家提供相应问题的参考思路。
这篇文章有点遗憾并没重现分区不均衡的样例和warning: consumer group ‘testgroup1' is rebalancing. 这里仅将正确的方式展示,等后续重现了在进行补充。
主要有两个要点:
- 1、一个消费者组只消费一个topic.
- 2、factory.setconcurrency(concurrency);这里设置监听并发数为 部署单元节点*concurrency=分区数量
1、先在kafka消息中创建
对应分区数目的topic(testtopic2,testtopic3)testtopic1由代码创建
./kafka-topics.sh --create --zookeeper 192.168.25.128:2181 --replication-factor 1 --partitions 2 --topic testtopic2
2、添加配置文件application.properties
kafka.test.topic1=testtopic1 kafka.test.topic2=testtopic2 kafka.test.topic3=testtopic3 kafka.broker=192.168.25.128:9092 auto.commit.interval.time=60000 #kafka.test.group=customer-test kafka.test.group1=testgroup1 kafka.test.group2=testgroup2 kafka.test.group3=testgroup3 kafka.offset=earliest kafka.auto.commit=false session.timeout.time=10000 kafka.concurrency=2
3、创建kafka工厂
package com.yin.customer.config; import org.apache.kafka.clients.consumer.consumerconfig; import org.apache.kafka.common.serialization.stringdeserializer; import org.springframework.beans.factory.annotation.value; import org.springframework.context.annotation.bean; import org.springframework.context.annotation.configuration; import org.springframework.kafka.config.concurrentkafkalistenercontainerfactory; import org.springframework.kafka.config.kafkalistenercontainerfactory; import org.springframework.kafka.core.consumerfactory; import org.springframework.kafka.core.defaultkafkaconsumerfactory; import org.springframework.kafka.listener.abstractmessagelistenercontainer; import org.springframework.kafka.listener.concurrentmessagelistenercontainer; import org.springframework.kafka.listener.containerproperties; import org.springframework.stereotype.component; import java.util.hashmap; import java.util.map; /** * @author yin * @date 2019/11/24 15:54 * @method */ @configuration @component public class kafkaconfig { @value("${kafka.broker}") private string broker; @value("${kafka.auto.commit}") private string autocommit; // @value("${kafka.test.group}") //private string testgroup; @value("${session.timeout.time}") private string sessionouttime; @value("${auto.commit.interval.time}") private string autocommittime; @value("${kafka.offset}") private string offset; @value("${kafka.concurrency}") private integer concurrency; @bean kafkalistenercontainerfactory<concurrentmessagelistenercontainer<string, string>> kafkalistenercontainerfactory(){ concurrentkafkalistenercontainerfactory<string, string> factory = new concurrentkafkalistenercontainerfactory<>(); factory.setconsumerfactory(consumerfactory()); //监听设置两个个分区 factory.setconcurrency(concurrency); //打开批量拉取数据 factory.setbatchlistener(true); //这里设置的是心跳时间也是拉的时间,也就说每间隔max.poll.interval.ms我们就调用一次poll,kafka默认是300s,心跳只能在poll的时候发出,如果连续两次poll的时候超过 //max.poll.interval.ms 值就会导致rebalance //心跳导致groupcoordinator以为本地consumer节点挂掉了,引发了partition在consumergroup里的rebalance。 // 当rebalance后,之前该consumer拥有的分区和offset信息就失效了,同时导致不断的报auto offset commit failed。 factory.getcontainerproperties().setpolltimeout(3000); factory.getcontainerproperties().setackmode(containerproperties.ackmode.manual_immediate); return factory; } private consumerfactory<string,string> consumerfactory() { return new defaultkafkaconsumerfactory<string, string>(consumerconfigs()); } @bean public map<string, object> consumerconfigs() { map<string, object> propsmap = new hashmap<>(); //kafka的地址 propsmap.put(consumerconfig.bootstrap_servers_config, broker); //是否自动提交 offset propsmap.put(consumerconfig.enable_auto_commit_config, autocommit); // enable.auto.commit 设置成 false,那么 auto.commit.interval.ms 也就不被再考虑 //默认5秒钟,一个 consumer 将会提交它的 offset 给 kafka propsmap.put(consumerconfig.auto_commit_interval_ms_config, 5000); //这个值必须设置在broker configuration中的group.min.session.timeout.ms 与 group.max.session.timeout.ms之间。 //zookeeper.session.timeout.ms 默认值:6000 //zookeeper的session的超时时间,如果在这段时间内没有收到zk的心跳,则会被认为该kafka server挂掉了。 // 如果把这个值设置得过低可能被误认为挂掉,如果设置得过高,如果真的挂了,则需要很长时间才能被server得知。 propsmap.put(consumerconfig.session_timeout_ms_config, sessionouttime); propsmap.put(consumerconfig.key_deserializer_class_config, stringdeserializer.class); propsmap.put(consumerconfig.value_deserializer_class_config, stringdeserializer.class); //组与组间的消费者是没有关系的。 //topic中已有分组消费数据,新建其他分组id的消费者时,之前分组提交的offset对新建的分组消费不起作用。 //propsmap.put(consumerconfig.group_id_config, testgroup); //当创建一个新分组的消费者时,auto.offset.reset值为latest时, // 表示消费新的数据(从consumer创建开始,后生产的数据),之前产生的数据不消费。 // https://blog.csdn.net/u012129558/article/details/80427016 //earliest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费。 // latest 当分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据。 propsmap.put(consumerconfig.auto_offset_reset_config, offset); //不是指每次都拉50条数据,而是一次最多拉50条数据() propsmap.put(consumerconfig.max_poll_records_config, 5); return propsmap; } }
4、展示kafka消费者
@component public class kafkaconsumer { private static final logger logger = loggerfactory.getlogger(kafkaconsumer.class); @kafkalistener(topics = "${kafka.test.topic1}",groupid = "${kafka.test.group1}",containerfactory = "kafkalistenercontainerfactory") public void listenpartition1(list<consumerrecord<?, ?>> records,acknowledgment ack) { logger.info("testtopic1 recevice a message size :{}" , records.size()); try { for (consumerrecord<?, ?> record : records) { optional<?> kafkamessage = optional.ofnullable(record.value()); logger.info("received:{} " , record); if (kafkamessage.ispresent()) { object message = record.value(); string topic = record.topic(); thread.sleep(300); logger.info("p1 topic is:{} received message={}",topic, message); } } } catch (exception e) { e.printstacktrace(); } finally { ack.acknowledge(); } } @kafkalistener(topics = "${kafka.test.topic2}",groupid = "${kafka.test.group2}",containerfactory = "kafkalistenercontainerfactory") public void listenpartition2(list<consumerrecord<?, ?>> records,acknowledgment ack) { logger.info("testtopic2 recevice a message size :{}" , records.size()); try { for (consumerrecord<?, ?> record : records) { optional<?> kafkamessage = optional.ofnullable(record.value()); logger.info("received:{} " , record); if (kafkamessage.ispresent()) { object message = record.value(); string topic = record.topic(); thread.sleep(300); logger.info("p2 topic :{},received message={}",topic, message); } } } catch (exception e) { e.printstacktrace(); } finally { ack.acknowledge(); } } @kafkalistener(topics = "${kafka.test.topic3}",groupid = "${kafka.test.group3}",containerfactory = "kafkalistenercontainerfactory") public void listenpartition3(list<consumerrecord<?, ?>> records, acknowledgment ack) { logger.info("testtopic3 recevice a message size :{}" , records.size()); try { for (consumerrecord<?, ?> record : records) { optional<?> kafkamessage = optional.ofnullable(record.value()); logger.info("received:{} " , record); if (kafkamessage.ispresent()) { object message = record.value(); string topic = record.topic(); logger.info("p3 topic :{},received message={}",topic, message); thread.sleep(300); } } } catch (exception e) { e.printstacktrace(); } finally { ack.acknowledge(); } } }
查看分区消费情况:
kafka出现若干分区不消费的现象
近日,有用户反馈kafka有topic出现某个消费组消费的时候,有几个分区一直不消费消息,消息一直积压(图1)。除了一直积压外,还有一个现象就是消费组一直在重均衡,大约每5分钟就会重均衡一次。具体表现为消费分区的owner一直在改变(图2)。
(图1)
(图2)
定位过程
业务侧没有报错,同时kafka服务端日志也一切正常,同事先将消费组的机器滚动重启,仍然还是那几个分区没有消费,之后将这几个不消费的分区迁移至别的broker上,依然没有消费。
还有一个奇怪的地方,就是每次重均衡后,不消费的那几个分区的消费owner所在机器的网络都有流量变化。按理说不消费应该就是拉取不到分区不会有流量的。于是让运维去拉了下不消费的consumer的jstack日志。一看果然发现了问题所在。
从堆栈看,consumer已经拉取到消息,然后就一直卡在处理消息的业务逻辑上。这说明kafka是没有问题的,用户的业务逻辑有问题。
consumer在拉取完一批消息后,就一直在处理这批消息,但是这批消息中有若干条消息无法处理,而业务又没有超时操作或者异常处理导致进程一直处于消费中,无法去poll下一批数据。
又由于业务采用的是autocommit的offset提交方式,而根据源码可知,consumer只有在下一次poll中才会自动提交上次poll的offset,所以业务一直在拉取同一批消息而无法更新offset。反映的现象就是该consumer对应的分区的offset一直没有变,所以有积压的现象。
至于为什么会一直在重均衡消费组的原因也很明了了,就是因为有消费者一直卡在处理消息的业务逻辑上,超过了max.poll.interval.ms(默认5min),消费组就会将该消费者踢出消费组,从而发生重均衡。
验证
让业务方去查证业务日志,验证了积压的这几个分区,总是在循环的拉取同一批消息。
解决方法
临时解决方法就是跳过有问题的消息,将offset重置到有问题的消息之后。本质上还是要业务侧修改业务逻辑,增加超时或者异常处理机制,最好不要采用自动提交offset的方式,可以手动管理。
以上为个人经验,希望能给大家一个参考,也希望大家多多支持。
上一篇: 用R语言实现霍夫曼编码的示例代码