kafka_2.11-2.0.0_常用操作
程序员文章站
2022-04-12 10:01:37
参考博文:Kafka消费组(consumer group) 参考博文:kafka 1.0 中文文档(九):操作 参考博文:kafka集群管理工具kafka-manager部署安装 以下操作可以在mini01、mini02、mini03任意一台操作即可。 1. kafka通过网页管理 参考博文:kaf ......
参考博文:
参考博文:
以下操作可以在mini01、mini02、mini03任意一台操作即可。
1. kafka通过网页管理
参考博文:
2. 创建topic
1 # 参数说明 --replication-factor 2 表示有2个副本 2 # --partitions 4 表示有4个分区 3 [yun@mini01 ~]$ kafka-topics.sh --create --zookeeper mini01:2181 --replication-factor 2 --partitions 4 --topic test 4 created topic "test". 5 [yun@mini01 ~]$ kafka-topics.sh --create --zookeeper mini01:2181 --replication-factor 3 --partitions 4 --topic zhang 6 created topic "zhang". 7 [yun@mini01 ~]$ kafka-topics.sh --list --zookeeper mini01:2181 # 再次查看 8 zhang 9 test
2.1. 各主机信息查看
mini01
1 [yun@mini01 logs]$ pwd 2 /app/kafka/logs 3 [yun@mini01 logs]$ ll 4 total 160 5 ……………… 6 drwxrwxr-x 2 yun yun 141 sep 15 18:53 test-1 7 drwxrwxr-x 2 yun yun 141 sep 15 18:53 test-2 8 drwxrwxr-x 2 yun yun 141 sep 15 18:53 test-3
mini02
1 [yun@mini02 logs]$ pwd 2 /app/kafka/logs 3 [yun@mini02 logs]$ ll 4 total 260 5 ……………… 6 drwxrwxr-x 2 yun yun 141 sep 15 18:53 test-0 7 drwxrwxr-x 2 yun yun 141 sep 15 18:53 test-2
mini03
1 [yun@mini03 logs]$ pwd 2 /app/kafka/logs 3 [yun@mini03 logs]$ ll 4 total 132 5 ……………… 6 drwxrwxr-x 2 yun yun 141 sep 15 18:53 test-0 7 drwxrwxr-x 2 yun yun 141 sep 15 18:53 test-1 8 drwxrwxr-x 2 yun yun 141 sep 15 18:53 test-3
3. 修改topic
3.1. 增加分区数
注意:分区数不能减少
kafka目前不支持减少主题的分区数量。
1 [yun@mini01 ~]$ kafka-topics.sh --list --zookeeper mini01:2181 2 __consumer_offsets 3 test 4 test01 5 test02 6 test03 7 test04 8 zhang 9 [yun@mini01 ~]$ kafka-topics.sh --describe --zookeeper mini01:2181 --topic test01 10 topic:test01 partitioncount:5 replicationfactor:1 configs: 11 topic: test01 partition: 0 leader: 0 replicas: 0 isr: 0 12 topic: test01 partition: 1 leader: 1 replicas: 1 isr: 1 13 topic: test01 partition: 2 leader: 2 replicas: 2 isr: 2 14 topic: test01 partition: 3 leader: 0 replicas: 0 isr: 0 15 topic: test01 partition: 4 leader: 1 replicas: 1 isr: 1 16 [yun@mini01 ~]$ kafka-topics.sh --alter --zookeeper mini01:2181 --partitions 2 --topic test01 # 失败,分区数不能减少 17 warning: if partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected 18 error while executing topic command : the number of partitions for a topic can only be increased. topic test01 currently has 5 partitions, 2 would not be an increase. 19 [2018-09-16 09:12:40,034] error org.apache.kafka.common.errors.invalidpartitionsexception: the number of partitions for a topic can only be increased. topic test01 currently has 5 partitions, 2 would not be an increase. 20 (kafka.admin.topiccommand$) 21 [yun@mini01 ~]$ kafka-topics.sh --alter --zookeeper mini01:2181 --partitions 7 --topic test01 # 增加分区数 22 warning: if partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected 23 adding partitions succeeded! 24 [yun@mini01 ~]$ kafka-topics.sh --describe --zookeeper mini01:2181 --topic test01 25 topic:test01 partitioncount:7 replicationfactor:1 configs: 26 topic: test01 partition: 0 leader: 0 replicas: 0 isr: 0 27 topic: test01 partition: 1 leader: 1 replicas: 1 isr: 1 28 topic: test01 partition: 2 leader: 2 replicas: 2 isr: 2 29 topic: test01 partition: 3 leader: 0 replicas: 0 isr: 0 30 topic: test01 partition: 4 leader: 1 replicas: 1 isr: 1 31 topic: test01 partition: 5 leader: 2 replicas: 2 isr: 2 32 topic: test01 partition: 6 leader: 0 replicas: 0 isr: 0
4. 删除topic
1 # server.properties中设置delete.topic.enable=true 【当前版本默认就是true】否则只是标记删除或者直接重启 2 [yun@mini01 ~]$ kafka-topics.sh --delete --zookeeper mini01:2181 --topic test 3 topic test is marked for deletion. 4 note: this will have no impact if delete.topic.enable is not set to true. 5 [yun@mini01 ~]$ kafka-topics.sh --list --zookeeper mini01:2181 # 再次查看 只有 zhang,则表示真的删除了 6 zhang
5. 查看所有topic
1 [yun@mini01 ~]$ kafka-topics.sh --list --zookeeper mini01:2181 2 __consumer_offsets 3 test 4 zhang
6. 查看某个topic的详情
1 [yun@mini01 ~]$ kafka-topics.sh --describe --zookeeper mini01:2181 --topic zhang 2 topic:zhang partitioncount:4 replicationfactor:3 configs: 3 topic: zhang partition: 0 leader: 1 replicas: 1,2,0 isr: 1,2,0 4 topic: zhang partition: 1 leader: 2 replicas: 2,0,1 isr: 2,0,1 5 topic: zhang partition: 2 leader: 0 replicas: 0,1,2 isr: 0,1,2 6 topic: zhang partition: 3 leader: 1 replicas: 1,0,2 isr: 1,0,2
7. 通过shell命令生产消息
7.1. 输入单条数据
1 [yun@mini01 ~]$ kafka-console-producer.sh --broker-list mini01:9092 --topic zhang 2 >111 3 >222 4 >333 5 >444 6 >555 7 >666 8 >777 9 >888 10 >999
7.2. 批量导入数据
1 [yun@mini01 zhangliang]$ kafka-console-producer.sh --broker-list mini01:9092 --topic liang < 001.info
8. 通过shell命令消费消息
1 # --from-beginning 从最开始读取 2 # kafka-console-consumer.sh --zookeeper mini01:2181 --from-beginning --topic zhang # 老版本 3 [yun@mini01 ~]$ kafka-console-consumer.sh --bootstrap-server mini01:9092 --from-beginning --topic zhang 4 111 5 555 6 999 7 333 8 777 9 444 10 888 11 222 12 666
9. 消费组消费
9.1. 创建topic
1 [yun@mini01 ~]$ kafka-topics.sh --create --zookeeper mini01:2181 --replication-factor 1 --partitions 4 --topic order 2 created topic "order". 3 [yun@mini01 ~]$ kafka-topics.sh --list --zookeeper mini01:2181 # 查看所有topic列表 4 __consumer_offsets 5 order 6 test 7 zhang 8 [yun@mini01 ~]$ kafka-topics.sh --describe --zookeeper mini01:2181 --topic order # 查看topic详情 9 topic:order partitioncount:4 replicationfactor:1 configs: 10 topic: order partition: 0 leader: 0 replicas: 0 isr: 0 11 topic: order partition: 1 leader: 1 replicas: 1 isr: 1 12 topic: order partition: 2 leader: 2 replicas: 2 isr: 2 13 topic: order partition: 3 leader: 0 replicas: 0 isr: 0
9.2. 生产消息
1 [yun@mini01 ~]$ kafka-console-producer.sh --broker-list mini01:9092 --topic order 2 >111 3 >222 4 >333 5 >444 6 >555
9.3. 消费组消费消息
mini02机器上运行
1 # --group 指定组 2 [yun@mini02 ~]$ kafka-console-consumer.sh --bootstrap-server mini01:9092 --topic order --group order-group
mini03机器上运行
1 # --group 指定组 2 [yun@mini03 ~]$ kafka-console-consumer.sh --bootstrap-server mini01:9092 --topic order --group order-group 3 4 # 新开一个窗口执行 5 [yun@mini03 ~]$ kafka-console-consumer.sh --bootstrap-server mini01:9092 --topic order --group order-group
表示order-group消费组有3个消费者,消费topic order的信息。
9.4. 消费组消费位置信息查看
1 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --describe --group order-group # 查看消费情况 2 3 topic partition current-offset log-end-offset lag consumer-id host client-id 4 order 0 4 4 0 consumer-1-2e9805db-e021-4595-8c62-92f8691fbf20 /172.16.1.13 consumer-1 5 order 1 5 5 0 consumer-1-2e9805db-e021-4595-8c62-92f8691fbf20 /172.16.1.13 consumer-1 6 order 2 5 5 0 consumer-1-9e65dcfb-246f-4043-aaf7-3ee83532237f /172.16.1.13 consumer-1 7 order 3 4 4 0 consumer-1-ee17939d-1ffe-42c7-8261-b19be8acea43 /172.16.1.12 consumer-1 8 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --describe --group order-group --members --verbose 9 10 consumer-id host client-id #partitions assignment 11 consumer-1-9e65dcfb-246f-4043-aaf7-3ee83532237f /172.16.1.13 consumer-1 1 order(2) 12 consumer-1-2e9805db-e021-4595-8c62-92f8691fbf20 /172.16.1.13 consumer-1 2 order(0,1) 13 consumer-1-ee17939d-1ffe-42c7-8261-b19be8acea43 /172.16.1.12 consumer-1 1 order(3)
10. 管理消费组
10.1. 查看所有消费组
1 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --list 2 console-consumer-26727 3 console-consumer-92984 4 console-consumer-60755 5 console-consumer-11661 6 console-consumer-31713 7 console-consumer-20244 8 console-consumer-65733
10.2. 查看消费组消费情况【消费位置】
1 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --describe --group console-consumer-26727 2 consumer group 'console-consumer-26727' has no active members. 3 4 topic partition current-offset log-end-offset lag consumer-id host client-id 5 zhang 3 11 11 0 - - - 6 zhang 0 9 9 0 - - - 7 zhang 2 8 8 0 - - - 8 zhang 1 11 11 0 - - - 9 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --describe --group console-consumer-65733 10 11 topic partition current-offset log-end-offset lag consumer-id host client-id 12 zhang 0 11 11 0 consumer-1-17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13 consumer-1 13 zhang 1 12 12 0 consumer-1-17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13 consumer-1 14 zhang 2 10 10 0 consumer-1-17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13 consumer-1 15 zhang 3 12 12 0 consumer-1-17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13 consumer-1
--members
1 # --members 此选项提供使用者组中所有活动成员的列表。 2 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --describe --group console-consumer-65733 --members 3 4 consumer-id host client-id #partitions 5 consumer-1-17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13 consumer-1 4
--verbose
1 # --verbose 这个选项还提供了分配给每个成员的分区。 2 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --describe --group console-consumer-65733 --members --verbose 3 4 consumer-id host client-id #partitions assignment 5 consumer-1-17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13 consumer-1 4 zhang(0,1,2,3)
--state
1 # --state 这个选项提供了有用的组级信息。 2 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --describe --group console-consumer-65733 --state 3 4 coordinator (id) assignment-strategy state #members 5 mini01:9092 (0) range stable 1
10.3. 删除一个或多个用户组
1 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --list 2 console-consumer-3826 3 console-consumer-92984 4 console-consumer-60755 5 console-consumer-11661 6 console-consumer-31713 7 console-consumer-20244 8 console-consumer-65733 9 # 删除一个或多个组 10 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --delete --group console-consumer-11661 --group console-consumer-31713 11 deletion of requested consumer groups ('console-consumer-31713', 'console-consumer-11661') was successful. 12 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --list 13 console-consumer-3826 14 console-consumer-92984 15 console-consumer-60755 16 console-consumer-20244 17 console-consumer-65733
11. 增加副本因子
1 [yun@mini01 kafka_20180916]$ kafka-topics.sh --create --zookeeper mini01:2181 --replication-factor 1 --partitions 4 --topic order 2 created topic "order". 3 [yun@mini01 kafka_20180916]$ kafka-topics.sh --describe --zookeeper mini01:2181 --topic order 4 topic:order partitioncount:4 replicationfactor:1 configs: 5 topic: order partition: 0 leader: 0 replicas: 0 isr: 0 6 topic: order partition: 1 leader: 1 replicas: 1 isr: 1 7 topic: order partition: 2 leader: 2 replicas: 2 isr: 2 8 topic: order partition: 3 leader: 0 replicas: 0 isr: 0
要求:topic order 的副本数由1变为2, 之前每个分区在哪台机器上在上面已给出。
说明:part 0分布在群集0,1; part 1分布在集群1,2;part 2 分布在集群2,0;part 3分布在集群0,1。
11.1. 创建一个重新调整计划文件
1 [yun@mini01 kafka_20180916]$ cat increase-replication-factor.json 2 { 3 "version":1, 4 "partitions":[ 5 {"topic": "order","partition": 0,"replicas": [0,1]}, 6 {"topic": "order","partition": 1,"replicas": [1,2]}, 7 {"topic": "order","partition": 2,"replicas": [2,0]}, 8 {"topic": "order","partition": 3,"replicas": [0,1]} 9 ] 10 }
11.2. 语句执行
1 [yun@mini01 kafka_20180916]$ kafka-reassign-partitions.sh --zookeeper mini01:2181 --reassignment-json-file increase-replication-factor.json --execute 2 current partition replica assignment 3 4 {"version":1,"partitions":[{"topic":"order","partition":2,"replicas":[2],"log_dirs":["any"]},{"topic":"order","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"order","partition":3,"replicas":[0],"log_dirs":["any"]},{"topic":"order","partition":0,"replicas":[0],"log_dirs":["any"]}]} 5 6 save this to use as the --reassignment-json-file option during rollback 7 successfully started reassignment of partitions.
11.3. 查看是否执行成功
1 [yun@mini01 kafka_20180916]$ kafka-reassign-partitions.sh --zookeeper mini01:2181 --reassignment-json-file increase-replication-factor.json --verify 2 status of partition reassignment: 3 reassignment of partition order-0 completed successfully 4 reassignment of partition order-1 completed successfully 5 reassignment of partition order-2 completed successfully 6 reassignment of partition order-3 completed successfully
11.4. 再次查看该topic详情
1 [yun@mini01 kafka_20180916]$ kafka-topics.sh --describe --zookeeper mini01:2181 --topic order # 由下可见分配成功 2 topic:order partitioncount:4 replicationfactor:2 configs: 3 topic: order partition: 0 leader: 0 replicas: 0,1 isr: 0,1 4 topic: order partition: 1 leader: 1 replicas: 1,2 isr: 1,2 5 topic: order partition: 2 leader: 2 replicas: 2,0 isr: 2,0 6 topic: order partition: 3 leader: 0 replicas: 0,1 isr: 0,1
12. 创建partitions时在broker的分配机制
1 kafka-topics.sh --create --zookeeper mini01:2181 --replication-factor 1 --partitions 5 --topic test01 2 kafka-topics.sh --create --zookeeper mini01:2181 --replication-factor 1 --partitions 11 --topic test02
注意在各机器上partitions的分布
1 mini01 2 test01-0 3 test01-3 4 test02-2 5 test02-5 6 test02-8 7 8 mini02 9 test01-1 10 test01-4 11 test02-0 12 test02-3 13 test02-6 14 test02-9 15 16 mini03 17 test01-2 18 test02-1 19 test02-10 20 test02-4 21 test02-7