kafka的高可用情况下,挂掉一个节点,为什么消费者消费不到数据了
程序员文章站
2022-06-27 20:47:28
1.假设有kafka集群,3个broker kafka集群kafka01 kafka02 kafka032.创建topic test (分区3 副本3)kafka-topics.sh --create --topic 'test' --zookeeper 'hadoop01:2181,hadoop02:2181,hadoop03:2181' --partitions 3 --replication-factor 33.场景3.1 生产者生产数据kafka-console-p...
1.假设有kafka集群,3个broker
kafka集群
kafka01 kafka02 kafka03
2.创建topic test (分区3 副本3)
kafka-topics.sh --create --topic 'test' --zookeeper 'hadoop01:2181,hadoop02:2181,hadoop03:2181' --partitions 3 --replication-factor 3
3.场景
3.1 生产者生产数据
kafka-console-producer.sh --broker-list 'hadoop01:9092,hadoop02:9092,hadoop03:9092' --topic 'test'
.......
data - Cluster ID: qdP2jzDLRcautzTjQ4Lvfg
12
>22
3.2 消费者消费数据
消费者组 groupid:2222 正在消费topic:test的数据
kafka-console-consumer.sh --topic 'test' --bootstrap-server kafka01:9092,kafka02:9092,kafka03:9092 --group 2222
2020-11-12T17:11:15,594 INFO [main] org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=console-consumer-11985] Resetting offset for partition flinkkafka333-2 to offset 8.
12
22
此时kafka01挂掉了,继续生产数据
kafka-console-producer.sh --broker-list 'hadoop01:9092,hadoop02:9092,hadoop03:9092' --topic 'test'
.......
data - Cluster ID: qdP2jzDLRcautzTjQ4Lvfg
33
44
消费者消费不了数据,并报下列告警日志
2020-11-12T17:11:43,568 WARN [kafka-coordinator-heartbeat-thread | console-consumer-11985] org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=console-consumer-11985] Connection to node 2147483646 (/192.168.70.115:9092) could not be established. Broker may not be available.
2020-11-12T17:11:43,569 INFO [kafka-coordinator-heartbeat-thread | console-consumer-11985] org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=console-consumer-11985] Group coordinator 192.168.70.115:9092 (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery
2020-11-12T17:11:43,572 INFO [main] org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=console-consumer-11985] Discovered group coordinator 192.168.70.115:9092 (id: 2147483646 rack: null)
4. 为什么会出现上述情况?
4.1 为什么消费者不能够消费topic:test数据?
kafak的topic __consumer_offsets默认副本为1,而恰好其0-49共50个分区全部在kafka01上,此时消费者组groupid:2222 找不到 __consumer_offsets自己的offset了
[root@hadoop01 kafka-logs]# ls
cdn_events-0 __consumer_offsets-21 __consumer_offsets-36 __consumer_offsets-6
cleaner-offset-checkpoint __consumer_offsets-22 __consumer_offsets-37 __consumer_offsets-7
__consumer_offsets-0 __consumer_offsets-23 __consumer_offsets-38 __consumer_offsets-8
__consumer_offsets-1 __consumer_offsets-24 __consumer_offsets-39 __consumer_offsets-9
__consumer_offsets-10 __consumer_offsets-25 __consumer_offsets-4 kafka-test-0
__consumer_offsets-11 __consumer_offsets-26 __consumer_offsets-40 log-start-offset-checkpoint
__consumer_offsets-12 __consumer_offsets-27 __consumer_offsets-41 meta.properties
__consumer_offsets-13 __consumer_offsets-28 __consumer_offsets-42 mysqlSinkTest-0
__consumer_offsets-14 __consumer_offsets-29 __consumer_offsets-43 recovery-point-offset-checkpoint
__consumer_offsets-15 __consumer_offsets-3 __consumer_offsets-44 replication-offset-checkpoint
__consumer_offsets-16 __consumer_offsets-30 __consumer_offsets-45 test_log-0
__consumer_offsets-17 __consumer_offsets-31 __consumer_offsets-46 wordCount_input-1
__consumer_offsets-18 __consumer_offsets-32 __consumer_offsets-47 wordCount_output-1
__consumer_offsets-19 __consumer_offsets-33 __consumer_offsets-48
__consumer_offsets-2 __consumer_offsets-34 __consumer_offsets-49
__consumer_offsets-20 __consumer_offsets-35 __consumer_offsets-5
4.2 为什么生产者能够发送数据到topic:test
由于topic:test的副本为3,即使在kafka01上的分区副本挂掉了,在kafka02,kafka03上还有其副本,故往 topic:test发送数据是能够成功的.
4.3 重启kafka01,消费者可消费数据
kafka01 重启后,消费者重新消费到了数据
2020-11-12T17:05:28,033 INFO [main] org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=2222] Discovered group coordinator 192.168.70.115:9092 (id: 2147483646 rack: null)
2020-11-12T17:05:28,034 INFO [main] org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=2222] (Re-)joining group
2020-11-12T17:05:28,141 INFO [main] org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=2222] (Re-)joining group
2020-11-12T17:05:28,243 INFO [main] org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=2222] (Re-)joining group
2020-11-12T17:05:28,250 INFO [main] org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=2222] (Re-)joining group
2020-11-12T17:05:38,254 INFO [main] org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=2222] Successfully joined group with generation 4
2020-11-12T17:05:38,255 INFO [main] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=2222] Setting newly assigned partitions: flinkkafka333-1, flinkkafka333-2, flinkkafka333-0
33
44
5.解决
kafka server.properties中默认配置
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
将配置修改为
############################# Internal Topic Settings #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
# 以下这些参数,共同影响集群的高可用性
# topic:__consumer_offsets的副本数,默认1
offsets.topic.replication.factor=3
# topic:__transaction_state的副本数,默认1
transaction.state.log.replication.factor=3
# topic:__transaction_state ISR中最小同步副本数,默认2
transaction.state.log.min.isr=2
# min.insync.replicas这个参数设定ISR中的最小副本数是多少,默认值为1,当且仅当offsets.commit.required.acks参数设置为-1时,此参数才生效。如果ISR中的副本数少于min.insync.replicas配置的数量时,客户端会返回异常:org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。
#min.insync.replicas=2
# 每个follow从leader拉取消息进行同步数据拉取线程数,默认1,配置多可以提高follower的I/O并发度,单位时间内leader持有更多请求,相应负载会增大,需要根据机器硬件资源做权衡
num.replica.fetchers=2
# 创建topic的默认副本数,默认值1,可根据实际情况创建topic时指定
default.replication.factor=2
#创建topic的默认分区数,默认值1,可根据实际情况创建topic时指定
num.partitions=2
6.重启kafka
[root@hadoop01 log]# kafka-topics.sh --describe --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --topic __consumer_offsets
2020-11-12T18:13:09,825 INFO [main] kafka.zookeeper.ZooKeeperClient - [ZooKeeperClient] Connected.
Topic:__consumer_offsets PartitionCount:50 ReplicationFactor:1 Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
Topic: __consumer_offsets Partition: 0 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 1 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 2 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 3 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 4 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 5 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 6 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 7 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 8 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 9 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 10 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 11 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 12 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 13 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 14 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 15 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 16 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 17 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 18 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 19 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 20 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 21 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 22 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 23 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 24 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 25 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 26 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 27 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 28 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 29 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 30 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 31 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 32 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 33 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 34 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 35 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 36 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 37 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 38 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 39 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 40 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 41 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 42 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 43 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 44 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 45 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 46 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 47 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 48 Leader: -1 Replicas: 1 Isr: 1
Topic: __consumer_offsets Partition: 49 Leader: -1 Replicas: 1 Isr: 1
发现不起作用,这是因为如果 __consumer_offsets topic已经存在,修改副本数是没有效果的,此时只能够手动增加__consumer_offsets的副本数
具体查看:
Kafka动态增加Topic的副本
https://www.cnblogs.com/xiao987334176/p/10315176.html
本文地址:https://blog.csdn.net/lihuazaizheli/article/details/109648517