欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

kafka的高可用情况下,挂掉一个节点,为什么消费者消费不到数据了

程序员文章站 2022-06-27 20:47:28
1.假设有kafka集群,3个broker kafka集群kafka01 kafka02 kafka032.创建topic test (分区3 副本3)kafka-topics.sh --create --topic 'test' --zookeeper 'hadoop01:2181,hadoop02:2181,hadoop03:2181' --partitions 3 --replication-factor 33.场景3.1 生产者生产数据kafka-console-p...

1.假设有kafka集群,3个broker

     kafka集群
kafka01  kafka02   kafka03

2.创建topic test (分区3 副本3)

kafka-topics.sh --create --topic 'test' --zookeeper 'hadoop01:2181,hadoop02:2181,hadoop03:2181'  --partitions 3 --replication-factor 3

3.场景

3.1 生产者生产数据

kafka-console-producer.sh --broker-list 'hadoop01:9092,hadoop02:9092,hadoop03:9092' --topic 'test'
   .......
	data - Cluster ID: qdP2jzDLRcautzTjQ4Lvfg
	12
	>22

3.2 消费者消费数据

消费者组 groupid:2222 正在消费topic:test的数据

kafka-console-consumer.sh --topic 'test' --bootstrap-server   kafka01:9092,kafka02:9092,kafka03:9092  --group 2222

 2020-11-12T17:11:15,594 INFO [main] org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=console-consumer-11985] Resetting offset for partition flinkkafka333-2 to offset 8.
12
22

此时kafka01挂掉了,继续生产数据

	kafka-console-producer.sh --broker-list 'hadoop01:9092,hadoop02:9092,hadoop03:9092' --topic 'test'
	   .......
		data - Cluster ID: qdP2jzDLRcautzTjQ4Lvfg
		33
		44

消费者消费不了数据,并报下列告警日志

2020-11-12T17:11:43,568 WARN [kafka-coordinator-heartbeat-thread | console-consumer-11985] org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=console-consumer-11985] Connection to node 2147483646 (/192.168.70.115:9092) could not be established. Broker may not be available.
2020-11-12T17:11:43,569 INFO [kafka-coordinator-heartbeat-thread | console-consumer-11985] org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=console-consumer-11985] Group coordinator 192.168.70.115:9092 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery
2020-11-12T17:11:43,572 INFO [main] org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=console-consumer-11985] Discovered group coordinator 192.168.70.115:9092 (id: 2147483646 rack: null)

4. 为什么会出现上述情况?

4.1 为什么消费者不能够消费topic:test数据?

kafak的topic __consumer_offsets默认副本为1,而恰好其0-49共50个分区全部在kafka01上,此时消费者组groupid:2222 找不到 __consumer_offsets自己的offset了

[root@hadoop01 kafka-logs]# ls
cdn_events-0               __consumer_offsets-21  __consumer_offsets-36  __consumer_offsets-6
cleaner-offset-checkpoint  __consumer_offsets-22  __consumer_offsets-37  __consumer_offsets-7
__consumer_offsets-0       __consumer_offsets-23  __consumer_offsets-38  __consumer_offsets-8
__consumer_offsets-1       __consumer_offsets-24  __consumer_offsets-39  __consumer_offsets-9
__consumer_offsets-10      __consumer_offsets-25  __consumer_offsets-4   kafka-test-0
__consumer_offsets-11      __consumer_offsets-26  __consumer_offsets-40  log-start-offset-checkpoint
__consumer_offsets-12      __consumer_offsets-27  __consumer_offsets-41  meta.properties
__consumer_offsets-13      __consumer_offsets-28  __consumer_offsets-42  mysqlSinkTest-0
__consumer_offsets-14      __consumer_offsets-29  __consumer_offsets-43  recovery-point-offset-checkpoint
__consumer_offsets-15      __consumer_offsets-3   __consumer_offsets-44  replication-offset-checkpoint
__consumer_offsets-16      __consumer_offsets-30  __consumer_offsets-45  test_log-0
__consumer_offsets-17      __consumer_offsets-31  __consumer_offsets-46  wordCount_input-1
__consumer_offsets-18      __consumer_offsets-32  __consumer_offsets-47  wordCount_output-1
__consumer_offsets-19      __consumer_offsets-33  __consumer_offsets-48
__consumer_offsets-2       __consumer_offsets-34  __consumer_offsets-49
__consumer_offsets-20      __consumer_offsets-35  __consumer_offsets-5

4.2 为什么生产者能够发送数据到topic:test

由于topic:test的副本为3,即使在kafka01上的分区副本挂掉了,在kafka02,kafka03上还有其副本,故往 topic:test发送数据是能够成功的.

4.3 重启kafka01,消费者可消费数据

kafka01 重启后,消费者重新消费到了数据

2020-11-12T17:05:28,033 INFO [main] org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=2222] Discovered group coordinator 192.168.70.115:9092 (id: 2147483646 rack: null)
2020-11-12T17:05:28,034 INFO [main] org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=2222] (Re-)joining group
2020-11-12T17:05:28,141 INFO [main] org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=2222] (Re-)joining group
2020-11-12T17:05:28,243 INFO [main] org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=2222] (Re-)joining group
2020-11-12T17:05:28,250 INFO [main] org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=2222] (Re-)joining group
2020-11-12T17:05:38,254 INFO [main] org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=2222] Successfully joined group with generation 4
2020-11-12T17:05:38,255 INFO [main] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=2222] Setting newly assigned partitions: flinkkafka333-1, flinkkafka333-2, flinkkafka333-0     
33
44

5.解决

kafka server.properties中默认配置

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1

将配置修改为

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
# 以下这些参数,共同影响集群的高可用性
# topic:__consumer_offsets的副本数,默认1
offsets.topic.replication.factor=3
# topic:__transaction_state的副本数,默认1
transaction.state.log.replication.factor=3
# topic:__transaction_state ISR中最小同步副本数,默认2
transaction.state.log.min.isr=2
# min.insync.replicas这个参数设定ISR中的最小副本数是多少,默认值为1,当且仅当offsets.commit.required.acks参数设置为-1时,此参数才生效。如果ISR中的副本数少于min.insync.replicas配置的数量时,客户端会返回异常:org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。 
#min.insync.replicas=2
# 每个follow从leader拉取消息进行同步数据拉取线程数,默认1,配置多可以提高follower的I/O并发度,单位时间内leader持有更多请求,相应负载会增大,需要根据机器硬件资源做权衡
num.replica.fetchers=2
# 创建topic的默认副本数,默认值1,可根据实际情况创建topic时指定
default.replication.factor=2
#创建topic的默认分区数,默认值1,可根据实际情况创建topic时指定
num.partitions=2

6.重启kafka

[root@hadoop01 log]# kafka-topics.sh --describe --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --topic __consumer_offsets

2020-11-12T18:13:09,825 INFO [main] kafka.zookeeper.ZooKeeperClient - [ZooKeeperClient] Connected.
Topic:__consumer_offsets	PartitionCount:50	ReplicationFactor:1	Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
	Topic: __consumer_offsets	Partition: 0	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 1	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 2	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 3	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 4	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 5	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 6	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 7	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 8	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 9	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 10	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 11	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 12	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 13	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 14	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 15	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 16	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 17	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 18	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 19	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 20	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 21	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 22	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 23	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 24	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 25	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 26	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 27	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 28	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 29	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 30	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 31	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 32	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 33	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 34	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 35	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 36	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 37	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 38	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 39	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 40	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 41	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 42	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 43	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 44	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 45	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 46	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 47	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 48	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 49	Leader: -1	Replicas: 1	Isr: 1

发现不起作用,这是因为如果 __consumer_offsets topic已经存在,修改副本数是没有效果的,此时只能够手动增加__consumer_offsets的副本数
具体查看:

Kafka动态增加Topic的副本
https://www.cnblogs.com/xiao987334176/p/10315176.html

本文地址:https://blog.csdn.net/lihuazaizheli/article/details/109648517

相关标签: Kafka