Kafka consumer 消费者原理详解
程序员文章站
2022-07-14 23:35:22
...
一、消费方式
kafka采用发布订阅模式:一对多。发布订阅模式又分两种:
-
push(推)模式很难适应消费速率不同的消费者,因为消息发送速率是由 broker 决定的。它的目标是尽可能以最快速度传递消息,但是这样很容易造成 consumer 来不及处理消息,典型的表现就是拒绝服务以及网络拥塞。而 pull 模式则可以根据 consumer 的消费能力以适当的速率消费消息。
-
kafka consumer 采用 pull(拉)模式从 broker 中读取数据。pull 模式不足之处是,如果 kafka 没有数据,消费者可能会陷入循环中,一直返回空数据。针对这一点,Kafka 的消费者在消费数据时会传入一个时长参数 timeout,如果当前没有数据可供消费,consumer 会等待一段时间之后再返回,这段时长即为 timeout。
二、分区分配策略
- 一个 consumer group 中有多个 consumer,一个 topic 有多个 partition,所以必然会涉及到 partition 的分配问题,即确定哪个 partition 由哪个 consumer 来消费。
- 关于如何设置partition值需要考虑的因素。一个partition只能被同一个消费者组里的一个消费者消费(一个消费者可以同时消费多个partition),因此,如果设置的partition的数量小于consumer的数量,就会有消费者消费不到数据。所以,推荐partition的数量一定要大于同时运行的consumer的数量。另外一方面,建议partition的数量大于集群broker的数量,这样leader partition就可以均匀的分布在各个broker中,最终使得集群负载均衡。在Cloudera,每个topic都有上百个partition。需要注意的是,kafka需要为每个partition分配一些内存来缓存消息数据,如果partition数量越大,就要为kafka分配更大的heap space。
- Kafka 有两种分配策略,一是 RoundRobin,一是 Range。
三、offset 的维护
由于 consumer 在消费过程中可能会出现断电宕机等故障,consumer 恢复后,需要从故障前的位置的继续消费,所以 consumer 需要实时记录自己消费到了哪个 offset,以便故障恢复后继续消费。
consumer group +topic + partition 唯一确定一个offest
高版本offset不在zk上存储了,从 0.9 版本开始,consumer 默认将 offset 保存在 Kafka 一个内置的 topic 中,该 topic 为__consumer_offsets。
你如果特别好奇,实在想看看offset什么的,也可以执行下面操作:
修改配置文件 consumer.properties
[aaa@qq.com kafka]# vim config/consumer.properties
添加如下配置
exclude.internal.topics=false
再启动一个消费者
[aaa@qq.com kafka]# bin/kafka-console-consumer.sh --topic __consumer_offsets
--bootstrap-server localhost:9092 --formatter
"kafka.coordinator.group.GroupMetadataManager\$OffsetsMessageFormatter"
--consumer.config config/consumer.properties --from-beginning
四、消费者组测试
我们可以验证一下同一个消费者组中的消费者,同一时刻只能有一个消费者消费一个分区的数据。
- 在 centos7-3、centos7-4 上修改kafka/config/consumer.properties 配置文件中的 group.id 属性为任意组名。
[aaa@qq.com kafka]# vim config/consumer.properties
修改 group.id=jh
[aaa@qq.com kafka]# vim config/consumer.properties
修改 group.id=jh
- 在 centos7-3、centos7-4 上分别启动消费者
[aaa@qq.com kafka]# bin/kafka-console-consumer.sh --bootstrap-server
localhost:9092 --topic testgroup --from-beginning --consumer.config
config/consumer.properties
[aaa@qq.com kafka]# bin/kafka-console-consumer.sh --bootstrap-server
localhost:9092 --topic testgroup --from-beginning --consumer.config
config/consumer.properties
- 在 centos7- 1上启动生产者
[aaa@qq.com kafka]# bin/kafka-console-producer.sh --broker-list
centos7-1:9092 --topic testgroup
4.效果展示