欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

Kafka常用命令合集

程序员文章站 2022-07-02 12:54:39
在上一篇文章《Linux安装Kafka》中,已经介绍了如何在Linux安装Kafka,以及Kafka的启动/关闭和创建发话题并产生消息和消费消息。这篇文章就介绍介绍Kafka的那些常用的命令。关于Kafka的启停/创建话题/消息的产生和消费等命令在上一篇文章《Linux安装Kafka》中已经指出,这... ......

在上一篇文章《linux安装kafka》中,已经介绍了如何在linux安装kafka,以及kafka的启动/关闭和创建发话题并产生消息和消费消息。这篇文章就介绍介绍kafka的那些常用的命令。

关于kafka的启停/创建话题/消息的产生和消费等命令在上一篇文章《linux安装kafka》中已经指出,这里就不说了。就说说其他常用命令。

♛ 1 查看消费者状态和消费详情

有时候我们需要关心消费者应用的状态,一般消费者应用会自己通过日志获知当前消费到了哪个topic的哪个partition的哪个offset,但当消费者出问题之后,或者出于监控的原因,我们需要知道消费者的状态和详情,那么需要借助kafka提供的相关命令。

命令格式:

bin/kafka-consumer-groups.sh --bootstrap-server borker_host1:port1,borker_hsot2:port2 --list

bin/kafka-consumer-groups.sh --bootstrap-server borker_host1:port1,borker_hsot2:port2 --group group_name --describe

tips:

  • broker_host是kafka server的ip地址,port是server的监听端口。多个host port之间用逗号隔开
  • 第一条命令是获取group列表,一般而言,应用是知道消费者group的,通常在应用的配置里,如果已知,该步骤可以省略
  • 第二条命令是查看具体的消费者group的详情信息,需要给出group的名称

示例:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group console-consumer-68682 --describe

效果图:

Kafka常用命令合集

  • topic:该group里消费的topic名称
  • partition:分区编号
  • current-offset:该分区当前消费到的offset
  • log-end-offset:该分区当前latest offset
  • lag:消费滞后区间,为log-end-offset-current-offset,具体大小需要看应用消费速度和生产者速度,一般过大则可能出现消费跟不上,需要引起应用注意
  • consumer-id:server端给该分区分配的consumer编号
  • host:消费者所在主机
  • client-id:消费者id,一般由应用指定
♛ 2 查询topic的offset的范围

用下面命令可以查询到topic:demo broker:localhost:9092的offset的最小值:

bin/kafka-run-class.sh kafka.tools.getoffsetshell --broker-list localhost:9092 -topic demo --time -2

查询offset的最大值:

bin/kafka-run-class.sh kafka.tools.getoffsetshell --broker-list localhost:9092 -topic demo --time -1

效果图:

Kafka常用命令合集

♛ 3 重置消费者offset

有些场景可能希望修改消费者消费到的offset位置,以达到重新消费,或者跳过一部分消息的目的,这时候重置offset的工具就非常实用。

命令格式:

bin/kafka-consumer-groups.sh --bootstrap-server borker_host1:port1,borker_hsot2:port2 --group group_name --reset-offsets --execute --to-offset new_offset --topic topic_name

bin/kafka-consumer-groups.sh --bootstrap-server borker_host1:port1,borker_hsot2:port2 --group group_name --reset-offsets --execute --to-earliest/--to-latest --topic topic_name

tips:

  • broker_host是kafka server的ip地址,port是server的监听端口。多个host port之间用逗号隔开
  • 第一条命令是将指定group_name和topic的offset修改到new_offset的位置,重启消费者后,消费中将从指定的offset处消费。注意这里只能new_offset只能设置一个值,也就是说,所有的分区都将使用这个值,如果分区消息负载不均衡,需要考虑是否适用。
  • 第二条命令是将指定group_name和topic的offset修改到earliest或者latest位置,使得消费者从头或者从尾部消费。

示例:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group console-consumer-68682 --reset-offsets --execute --to-offset 3 --topic demo

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group console-consumer-68682 --reset-offsets --execute --to-latest --topic demo

效果图:

Kafka常用命令合集

可以通过直接更换消费者group id的方式,配合消费者默认的消费策略,可以达到类似的效果,反而更加简单、高效和安全。

♛ 4 查看topic的状态和分区负载详情

当broker出现宕机,恢复之后,我们可以看下topic的leader是否负载均衡。因为kafka的所有读写消息的请求,都是发送到partition leader上的,因此在生产环境,负载均衡显得尤其重要。

命令格式:

bin/kafka-topics.sh --zookeeper zookeeper_host1:port1,zookeeper_host2:port2 --describe --topic topic_name

tips:

  • zookeeper_host是kafka所使用的zookeeper的ip地址,port是zookeeper监听的端口。多个host port之间用逗号隔开
  • 类似的,zookeeper集群不需要全部列上,给出一个可用的zk地址和端口即可

示例:

bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic demo

效果图:

Kafka常用命令合集

如果发现以下现象说明kafka异常:

  • 某个topic的每个分区,同步副本数量和设定的副本数量不一致
  • 某个topic的每个分区,leader的id数值是-1或者none
♛ 5 消费消息

从头开始消费:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --from-beginning

效果图:

Kafka常用命令合集

从尾部开始:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --offset latest --partition 0

效果图:

Kafka常用命令合集

指定分区:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --offset latest --partition 0

取指定个数:

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic demo --offset latest --partition 0 --max-messages 2

如示例部分,取2个消息,取完自动结束回话。

♛ 自带压测工具

测试使用kafka自带的测试脚本,通过命令对kafka发起写入mq消息和kafka消费mq消息的请求。模拟不同数量级的mq消息写入和mq消息消费场景,根据kafka的处理结果,评估kafka是否满足处理亿级以上的消息的能力。

命令格式:

bin/kafka-producer-perf-test.sh --topic demo --num-records 100 --record-size 1 --throughput 100 --producer-props bootstrap.servers=localhost:9092

示例:

测试项 压测消息数(单位:w)  测试命令
写入mq消息 10 

bin/kafka-producer-perf-test.sh --topic demo --num-records 100000 --record-size 1000  --throughput 2000 --producer-props bootstrap.servers=localhost:9092

  100 

bin/kafka-producer-perf-test.sh --topic demo --num-records 1000000 --record-size 2000  --throughput 5000 --producer-props bootstrap.servers=localhost:9092

  1000

bin/kafka-producer-perf-test.sh --topic demo --num-records 10000000 --record-size 2000  --throughput 5000 --producer-props bootstrap.servers=localhost:9092

消费mq消息 10 

bin/kafka-consumer-perf-test.sh --broker-list localhost:9092 --topic demo --fetch-size 1048576 --messages 100000 --threads 1

  100 

bin/kafka-consumer-perf-test.sh --broker-list localhost:9092 --topic demo --fetch-size 1048576 --messages 1000000 --threads 1

  1000 

bin/kafka-consumer-perf-test.sh --broker-list localhost:9092 --topic demo --fetch-size 1048576 --messages 10000000 --threads 1

本文中部分内容翻译或借鉴于以下学习资料,特别鸣谢:


作  者:
出  处:
关于作者:专注于基础平台的项目开发。如有问题或建议,请多多赐教!
版权声明:本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文链接。
特此声明:所有评论和私信都会在第一时间回复。也欢迎园子的大大们指正错误,共同进步。或者我
声援博主:如果您觉得文章对您有帮助,可以点击文章右下角推荐一下。您的鼓励是作者坚持原创和持续写作的最大动力!