kafka01  kafka02   kafka03

2.创建topic test (分区3 副本3)

kafka-topics.sh --create --topic 'test' --zookeeper 'hadoop01:2181,hadoop02:2181,hadoop03:2181'  --partitions 3 --replication-factor 3


3.1 生产者生产数据

kafka-console-producer.sh --broker-list 'hadoop01:9092,hadoop02:9092,hadoop03:9092' --topic 'test'
	data - Cluster ID: qdP2jzDLRcautzTjQ4Lvfg

3.2 消费者消费数据

消费者组 groupid:2222 正在消费topic:test的数据

kafka-console-consumer.sh --topic 'test' --bootstrap-server   kafka01:9092,kafka02:9092,kafka03:9092  --group 2222

 2020-11-12T17:11:15,594 INFO [main] org.apache.kafka.clients.consumer.internals.Fetcher - [Consumer clientId=consumer-1, groupId=console-consumer-11985] Resetting offset for partition flinkkafka333-2 to offset 8.


	kafka-console-producer.sh --broker-list 'hadoop01:9092,hadoop02:9092,hadoop03:9092' --topic 'test'
		data - Cluster ID: qdP2jzDLRcautzTjQ4Lvfg


2020-11-12T17:11:43,568 WARN [kafka-coordinator-heartbeat-thread | console-consumer-11985] org.apache.kafka.clients.NetworkClient - [Consumer clientId=consumer-1, groupId=console-consumer-11985] Connection to node 2147483646 (/ could not be established. Broker may not be available.
2020-11-12T17:11:43,569 INFO [kafka-coordinator-heartbeat-thread | console-consumer-11985] org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=console-consumer-11985] Group coordinator (id: 2147483646 rack: null) is unavailable or invalid, will attempt rediscovery
2020-11-12T17:11:43,572 INFO [main] org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=console-consumer-11985] Discovered group coordinator (id: 2147483646 rack: null)

4. 为什么会出现上述情况?

4.1 为什么消费者不能够消费topic:test数据?

kafak的topic __consumer_offsets默认副本为1,而恰好其0-49共50个分区全部在kafka01上,此时消费者组groupid:2222 找不到 __consumer_offsets自己的offset了

[root@hadoop01 kafka-logs]# ls
cdn_events-0               __consumer_offsets-21  __consumer_offsets-36  __consumer_offsets-6
cleaner-offset-checkpoint  __consumer_offsets-22  __consumer_offsets-37  __consumer_offsets-7
__consumer_offsets-0       __consumer_offsets-23  __consumer_offsets-38  __consumer_offsets-8
__consumer_offsets-1       __consumer_offsets-24  __consumer_offsets-39  __consumer_offsets-9
__consumer_offsets-10      __consumer_offsets-25  __consumer_offsets-4   kafka-test-0
__consumer_offsets-11      __consumer_offsets-26  __consumer_offsets-40  log-start-offset-checkpoint
__consumer_offsets-12      __consumer_offsets-27  __consumer_offsets-41  meta.properties
__consumer_offsets-13      __consumer_offsets-28  __consumer_offsets-42  mysqlSinkTest-0
__consumer_offsets-14      __consumer_offsets-29  __consumer_offsets-43  recovery-point-offset-checkpoint
__consumer_offsets-15      __consumer_offsets-3   __consumer_offsets-44  replication-offset-checkpoint
__consumer_offsets-16      __consumer_offsets-30  __consumer_offsets-45  test_log-0
__consumer_offsets-17      __consumer_offsets-31  __consumer_offsets-46  wordCount_input-1
__consumer_offsets-18      __consumer_offsets-32  __consumer_offsets-47  wordCount_output-1
__consumer_offsets-19      __consumer_offsets-33  __consumer_offsets-48
__consumer_offsets-2       __consumer_offsets-34  __consumer_offsets-49
__consumer_offsets-20      __consumer_offsets-35  __consumer_offsets-5

4.2 为什么生产者能够发送数据到topic:test

由于topic:test的副本为3,即使在kafka01上的分区副本挂掉了,在kafka02,kafka03上还有其副本,故往 topic:test发送数据是能够成功的.

4.3 重启kafka01,消费者可消费数据

kafka01 重启后,消费者重新消费到了数据

2020-11-12T17:05:28,033 INFO [main] org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=2222] Discovered group coordinator (id: 2147483646 rack: null)
2020-11-12T17:05:28,034 INFO [main] org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=2222] (Re-)joining group
2020-11-12T17:05:28,141 INFO [main] org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=2222] (Re-)joining group
2020-11-12T17:05:28,243 INFO [main] org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=2222] (Re-)joining group
2020-11-12T17:05:28,250 INFO [main] org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=2222] (Re-)joining group
2020-11-12T17:05:38,254 INFO [main] org.apache.kafka.clients.consumer.internals.AbstractCoordinator - [Consumer clientId=consumer-1, groupId=2222] Successfully joined group with generation 4
2020-11-12T17:05:38,255 INFO [main] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator - [Consumer clientId=consumer-1, groupId=2222] Setting newly assigned partitions: flinkkafka333-1, flinkkafka333-2, flinkkafka333-0     


kafka server.properties中默认配置

############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.


############################# Internal Topic Settings  #############################
# The replication factor for the group metadata internal topics "__consumer_offsets" and "__transaction_state"
# For anything other than development testing, a value greater than 1 is recommended for to ensure availability such as 3.
# 以下这些参数,共同影响集群的高可用性
# topic:__consumer_offsets的副本数,默认1
# topic:__transaction_state的副本数,默认1
# topic:__transaction_state ISR中最小同步副本数,默认2
# min.insync.replicas这个参数设定ISR中的最小副本数是多少,默认值为1,当且仅当offsets.commit.required.acks参数设置为-1时,此参数才生效。如果ISR中的副本数少于min.insync.replicas配置的数量时,客户端会返回异常:org.apache.kafka.common.errors.NotEnoughReplicasExceptoin: Messages are rejected since there are fewer in-sync replicas than required。 
# 每个follow从leader拉取消息进行同步数据拉取线程数,默认1,配置多可以提高follower的I/O并发度,单位时间内leader持有更多请求,相应负载会增大,需要根据机器硬件资源做权衡
# 创建topic的默认副本数,默认值1,可根据实际情况创建topic时指定


[root@hadoop01 log]# kafka-topics.sh --describe --zookeeper hadoop01:2181,hadoop02:2181,hadoop03:2181 --topic __consumer_offsets

2020-11-12T18:13:09,825 INFO [main] kafka.zookeeper.ZooKeeperClient - [ZooKeeperClient] Connected.
Topic:__consumer_offsets	PartitionCount:50	ReplicationFactor:1	Configs:segment.bytes=104857600,cleanup.policy=compact,compression.type=producer
	Topic: __consumer_offsets	Partition: 0	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 1	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 2	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 3	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 4	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 5	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 6	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 7	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 8	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 9	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 10	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 11	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 12	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 13	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 14	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 15	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 16	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 17	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 18	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 19	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 20	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 21	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 22	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 23	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 24	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 25	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 26	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 27	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 28	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 29	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 30	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 31	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 32	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 33	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 34	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 35	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 36	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 37	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 38	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 39	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 40	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 41	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 42	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 43	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 44	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 45	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 46	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 47	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 48	Leader: -1	Replicas: 1	Isr: 1
	Topic: __consumer_offsets	Partition: 49	Leader: -1	Replicas: 1	Isr: 1

发现不起作用,这是因为如果 __consumer_offsets topic已经存在,修改副本数是没有效果的,此时只能够手动增加__consumer_offsets的副本数



