欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

Kafka consumer 消费者原理详解

程序员文章站 2022-07-14 23:35:22
...

一、消费方式

kafka采用发布订阅模式:一对多。发布订阅模式又分两种:

  • push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。

  • kafka consumer 采用 pull(拉)模式从 broker 中读取数据。pull 模式不足之处是,如果 kafka 没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka 的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有数据可供消费,consumer 会等待一段时间之后再返回,这段时长即为 timeout。

二、分区分配策略

  • 一个 consumer group 中有多个 consumer,一个 topic 有多个 partition,所以必然会涉及到 partition 的分配问题,即确定哪个 partition 由哪个 consumer 来消费。
  • 关于如何设置partition值需要考虑的因素。一个partition只能被同一个消费者组里的一个消费者消费(一个消费者可以同时消费多个partition),因此,如果设置的partition的数量小于consumer的数量,就会有消费者消费不到数据。所以,推荐partition的数量一定要大于同时运行的consumer的数量。另外一方面,建议partition的数量大于集群broker的数量,这样leader partition就可以均匀的分布在各个broker中,最终使得集群负载均衡。在Cloudera,每个topic都有上百个partition。需要注意的是,kafka需要为每个partition分配一些内存来缓存消息数据,如果partition数量越大,就要为kafka分配更大的heap space。
  • Kafka 有两种分配策略,一是 RoundRobin,一是 Range。

三、offset 的维护

由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。
consumer group +topic + partition 唯一确定一个offest

Kafka consumer 消费者原理详解
高版本offset不在zk上存储了,从 0.9 版本开始,consumer 默认将 offset 保存在 Kafka 一个内置的 topic 中,该 topic 为__consumer_offsets。

Kafka consumer 消费者原理详解
Kafka consumer 消费者原理详解
Kafka consumer 消费者原理详解
你如果特别好奇,实在想看看offset什么的,也可以执行下面操作:

修改配置文件 consumer.properties

[aaa@qq.com kafka]# vim config/consumer.properties

添加如下配置
exclude.internal.topics=false

再启动一个消费者

[aaa@qq.com kafka]# bin/kafka-console-consumer.sh --topic __consumer_offsets 
--bootstrap-server localhost:9092 --formatter 
"kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter" 
--consumer.config config/consumer.properties --from-beginning

Kafka consumer 消费者原理详解

四、消费者组测试

我们可以验证一下同一个消费者组中的消费者,同一时刻只能有一个消费者消费一个分区的数据。

  1. 在 centos7-3、centos7-4 上修改kafka/config/consumer.properties 配置文件中的 group.id 属性为任意组名。
[aaa@qq.com kafka]# vim config/consumer.properties

修改 group.id=jh
[aaa@qq.com kafka]# vim config/consumer.properties

修改 group.id=jh
  1. 在 centos7-3、centos7-4 上分别启动消费者
[aaa@qq.com kafka]# bin/kafka-console-consumer.sh --bootstrap-server 
localhost:9092 --topic testgroup --from-beginning --consumer.config 
config/consumer.properties
[aaa@qq.com kafka]# bin/kafka-console-consumer.sh --bootstrap-server 
localhost:9092 --topic testgroup --from-beginning --consumer.config 
config/consumer.properties
  1. 在 centos7- 1上启动生产者
[aaa@qq.com kafka]# bin/kafka-console-producer.sh --broker-list 
centos7-1:9092 --topic testgroup 

4.效果展示
Kafka consumer 消费者原理详解
Kafka consumer 消费者原理详解
Kafka consumer 消费者原理详解