欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页  >  IT编程

kafka_2.11-2.0.0_常用操作

程序员文章站 2022-08-30 22:29:51
参考博文:Kafka消费组(consumer group) 参考博文:kafka 1.0 中文文档(九):操作 参考博文:kafka集群管理工具kafka-manager部署安装 以下操作可以在mini01、mini02、mini03任意一台操作即可。 1. kafka通过网页管理 参考博文:kaf ......

 

参考博文:kafka消费组(consumer group)

参考博文:

参考博文:

 

 

       以下操作可以在mini01、mini02、mini03任意一台操作即可

 

1. kafka通过网页管理

参考博文:

kafka_2.11-2.0.0_常用操作

 

 

2. 创建topic

1 # 参数说明 --replication-factor 2 表示有2个副本
2 # --partitions 4 表示有4个分区
3 [yun@mini01 ~]$ kafka-topics.sh --create --zookeeper mini01:2181 --replication-factor 2 --partitions 4 --topic test
4 created topic "test".
5 [yun@mini01 ~]$ kafka-topics.sh --create --zookeeper mini01:2181 --replication-factor 3 --partitions 4 --topic zhang
6 created topic "zhang".
7 [yun@mini01 ~]$ kafka-topics.sh --list --zookeeper mini01:2181  # 再次查看
8 zhang
9 test

 

2.1. 各主机信息查看

mini01

1 [yun@mini01 logs]$ pwd
2 /app/kafka/logs
3 [yun@mini01 logs]$ ll 
4 total 160
5 ………………
6 drwxrwxr-x 2 yun yun   141 sep 15 18:53 test-1
7 drwxrwxr-x 2 yun yun   141 sep 15 18:53 test-2
8 drwxrwxr-x 2 yun yun   141 sep 15 18:53 test-3

 

mini02

1 [yun@mini02 logs]$ pwd
2 /app/kafka/logs
3 [yun@mini02 logs]$ ll
4 total 260
5 ………………
6 drwxrwxr-x 2 yun yun    141 sep 15 18:53 test-0
7 drwxrwxr-x 2 yun yun    141 sep 15 18:53 test-2

 

mini03

1 [yun@mini03 logs]$ pwd
2 /app/kafka/logs
3 [yun@mini03 logs]$ ll
4 total 132
5 ………………
6 drwxrwxr-x 2 yun yun   141 sep 15 18:53 test-0
7 drwxrwxr-x 2 yun yun   141 sep 15 18:53 test-1
8 drwxrwxr-x 2 yun yun   141 sep 15 18:53 test-3

 

 

3. 修改topic

3.1. 增加分区数

       注意:分区数不能减少

  kafka目前不支持减少主题的分区数量。

 1 [yun@mini01 ~]$ kafka-topics.sh --list --zookeeper mini01:2181 
 2 __consumer_offsets
 3 test
 4 test01
 5 test02
 6 test03
 7 test04
 8 zhang
 9 [yun@mini01 ~]$ kafka-topics.sh --describe --zookeeper mini01:2181 --topic test01
10 topic:test01    partitioncount:5    replicationfactor:1    configs:
11     topic: test01    partition: 0    leader: 0    replicas: 0    isr: 0
12     topic: test01    partition: 1    leader: 1    replicas: 1    isr: 1
13     topic: test01    partition: 2    leader: 2    replicas: 2    isr: 2
14     topic: test01    partition: 3    leader: 0    replicas: 0    isr: 0
15     topic: test01    partition: 4    leader: 1    replicas: 1    isr: 1
16 [yun@mini01 ~]$ kafka-topics.sh --alter --zookeeper mini01:2181 --partitions 2 --topic test01 # 失败,分区数不能减少 
17 warning: if partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
18 error while executing topic command : the number of partitions for a topic can only be increased. topic test01 currently has 5 partitions, 2 would not be an increase.
19 [2018-09-16 09:12:40,034] error org.apache.kafka.common.errors.invalidpartitionsexception: the number of partitions for a topic can only be increased. topic test01 currently has 5 partitions, 2 would not be an increase.
20  (kafka.admin.topiccommand$)
21 [yun@mini01 ~]$ kafka-topics.sh --alter --zookeeper mini01:2181 --partitions 7 --topic test01  # 增加分区数 
22 warning: if partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected
23 adding partitions succeeded!
24 [yun@mini01 ~]$ kafka-topics.sh --describe --zookeeper mini01:2181 --topic test01
25 topic:test01    partitioncount:7    replicationfactor:1    configs:
26     topic: test01    partition: 0    leader: 0    replicas: 0    isr: 0
27     topic: test01    partition: 1    leader: 1    replicas: 1    isr: 1
28     topic: test01    partition: 2    leader: 2    replicas: 2    isr: 2
29     topic: test01    partition: 3    leader: 0    replicas: 0    isr: 0
30     topic: test01    partition: 4    leader: 1    replicas: 1    isr: 1
31     topic: test01    partition: 5    leader: 2    replicas: 2    isr: 2
32     topic: test01    partition: 6    leader: 0    replicas: 0    isr: 0

 

 

4. 删除topic

1 # server.properties中设置delete.topic.enable=true 【当前版本默认就是true】否则只是标记删除或者直接重启
2 [yun@mini01 ~]$ kafka-topics.sh --delete --zookeeper mini01:2181 --topic test
3 topic test is marked for deletion.
4 note: this will have no impact if delete.topic.enable is not set to true.
5 [yun@mini01 ~]$ kafka-topics.sh --list --zookeeper mini01:2181  # 再次查看   只有 zhang,则表示真的删除了
6 zhang

 

 

5. 查看所有topic

1 [yun@mini01 ~]$ kafka-topics.sh --list --zookeeper mini01:2181 
2 __consumer_offsets
3 test
4 zhang

 

 

6. 查看某个topic的详情

1 [yun@mini01 ~]$ kafka-topics.sh --describe --zookeeper mini01:2181 --topic zhang 
2 topic:zhang    partitioncount:4    replicationfactor:3    configs:
3     topic: zhang    partition: 0    leader: 1    replicas: 1,2,0    isr: 1,2,0
4     topic: zhang    partition: 1    leader: 2    replicas: 2,0,1    isr: 2,0,1
5     topic: zhang    partition: 2    leader: 0    replicas: 0,1,2    isr: 0,1,2
6     topic: zhang    partition: 3    leader: 1    replicas: 1,0,2    isr: 1,0,2

 

 

7. 通过shell命令生产消息

7.1. 输入单条数据

 1 [yun@mini01 ~]$ kafka-console-producer.sh --broker-list mini01:9092 --topic zhang
 2 >111
 3 >222
 4 >333
 5 >444
 6 >555
 7 >666
 8 >777
 9 >888
10 >999

 

7.2. 批量导入数据

1 [yun@mini01 zhangliang]$ kafka-console-producer.sh --broker-list mini01:9092 --topic liang < 001.info

 

 

8. 通过shell命令消费消息

 1 # --from-beginning 从最开始读取
 2 # kafka-console-consumer.sh --zookeeper mini01:2181 --from-beginning --topic zhang  # 老版本
 3 [yun@mini01 ~]$ kafka-console-consumer.sh --bootstrap-server mini01:9092 --from-beginning --topic zhang
 4 111
 5 555
 6 999
 7 333
 8 777
 9 444
10 888
11 222
12 666

 

 

9. 消费组消费

9.1. 创建topic

 1 [yun@mini01 ~]$ kafka-topics.sh --create --zookeeper mini01:2181 --replication-factor 1 --partitions 4 --topic order
 2 created topic "order".
 3 [yun@mini01 ~]$ kafka-topics.sh --list --zookeeper mini01:2181  # 查看所有topic列表
 4 __consumer_offsets
 5 order
 6 test
 7 zhang
 8 [yun@mini01 ~]$ kafka-topics.sh --describe --zookeeper mini01:2181 --topic order  # 查看topic详情
 9 topic:order    partitioncount:4    replicationfactor:1    configs:
10     topic: order    partition: 0    leader: 0    replicas: 0    isr: 0
11     topic: order    partition: 1    leader: 1    replicas: 1    isr: 1
12     topic: order    partition: 2    leader: 2    replicas: 2    isr: 2
13     topic: order    partition: 3    leader: 0    replicas: 0    isr: 0

 

9.2. 生产消息

1 [yun@mini01 ~]$ kafka-console-producer.sh --broker-list mini01:9092 --topic order
2 >111
3 >222
4 >333
5 >444
6 >555

 

9.3. 消费组消费消息

mini02机器上运行

1 # --group  指定组
2 [yun@mini02 ~]$ kafka-console-consumer.sh --bootstrap-server mini01:9092 --topic order --group order-group

 

mini03机器上运行

1 # --group  指定组
2 [yun@mini03 ~]$ kafka-console-consumer.sh --bootstrap-server mini01:9092 --topic order --group order-group
3 
4 # 新开一个窗口执行
5 [yun@mini03 ~]$ kafka-console-consumer.sh --bootstrap-server mini01:9092 --topic order --group order-group

 

  表示order-group消费组有3个消费者,消费topic order的信息。

 

9.4. 消费组消费位置信息查看

 1 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --describe --group order-group  # 查看消费情况
 2 
 3 topic    partition  current-offset  log-end-offset  lag   consumer-id                                     host            client-id
 4 order    0          4               4               0     consumer-1-2e9805db-e021-4595-8c62-92f8691fbf20 /172.16.1.13    consumer-1
 5 order    1          5               5               0     consumer-1-2e9805db-e021-4595-8c62-92f8691fbf20 /172.16.1.13    consumer-1
 6 order    2          5               5               0     consumer-1-9e65dcfb-246f-4043-aaf7-3ee83532237f /172.16.1.13    consumer-1
 7 order    3          4               4               0     consumer-1-ee17939d-1ffe-42c7-8261-b19be8acea43 /172.16.1.12    consumer-1
 8 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --describe --group order-group --members --verbose
 9 
10 consumer-id                                     host            client-id       #partitions     assignment
11 consumer-1-9e65dcfb-246f-4043-aaf7-3ee83532237f /172.16.1.13    consumer-1      1               order(2)
12 consumer-1-2e9805db-e021-4595-8c62-92f8691fbf20 /172.16.1.13    consumer-1      2               order(0,1)
13 consumer-1-ee17939d-1ffe-42c7-8261-b19be8acea43 /172.16.1.12    consumer-1      1               order(3)

 

 

10. 管理消费组

10.1. 查看所有消费组

1 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --list
2 console-consumer-26727
3 console-consumer-92984
4 console-consumer-60755
5 console-consumer-11661
6 console-consumer-31713
7 console-consumer-20244
8 console-consumer-65733

 

10.2. 查看消费组消费情况【消费位置】

 1 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --describe --group console-consumer-26727  
 2 consumer group 'console-consumer-26727' has no active members.
 3 
 4 topic           partition  current-offset  log-end-offset  lag             consumer-id     host            client-id
 5 zhang           3          11              11              0               -               -               -
 6 zhang           0          9               9               0               -               -               -
 7 zhang           2          8               8               0               -               -               -
 8 zhang           1          11              11              0               -               -               -
 9 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --describe --group console-consumer-65733  
10 
11 topic  partition  current-offset  log-end-offset  lag  consumer-id                                     host            client-id
12 zhang  0          11              11              0    consumer-1-17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13    consumer-1
13 zhang  1          12              12              0    consumer-1-17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13    consumer-1
14 zhang  2          10              10              0    consumer-1-17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13    consumer-1
15 zhang  3          12              12              0    consumer-1-17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13    consumer-1

 

--members

1 # --members 此选项提供使用者组中所有活动成员的列表。
2 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --describe --group console-consumer-65733 --members  
3 
4 consumer-id                                     host            client-id       #partitions     
5 consumer-1-17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13    consumer-1      4      

 

--verbose

1 # --verbose 这个选项还提供了分配给每个成员的分区。
2 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --describe --group console-consumer-65733 --members --verbose
3 
4 consumer-id                                     host            client-id       #partitions     assignment
5 consumer-1-17c812f0-116b-42a9-88d8-90d1a85949e1 /172.16.1.13    consumer-1      4               zhang(0,1,2,3)

 

--state

1 # --state  这个选项提供了有用的组级信息。
2 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --describe --group console-consumer-65733 --state
3 
4 coordinator (id)          assignment-strategy       state                #members
5 mini01:9092 (0)           range                     stable               1

 

10.3. 删除一个或多个用户组

 1 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --list
 2 console-consumer-3826
 3 console-consumer-92984
 4 console-consumer-60755
 5 console-consumer-11661
 6 console-consumer-31713
 7 console-consumer-20244
 8 console-consumer-65733
 9 # 删除一个或多个组
10 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --delete --group console-consumer-11661 --group console-consumer-31713
11 deletion of requested consumer groups ('console-consumer-31713', 'console-consumer-11661') was successful.
12 [yun@mini01 ~]$ kafka-consumer-groups.sh --bootstrap-server mini01:9092 --list
13 console-consumer-3826
14 console-consumer-92984
15 console-consumer-60755
16 console-consumer-20244
17 console-consumer-65733

 

 

11. 增加副本因子

1 [yun@mini01 kafka_20180916]$ kafka-topics.sh --create --zookeeper mini01:2181 --replication-factor 1 --partitions 4 --topic order
2 created topic "order".
3 [yun@mini01 kafka_20180916]$ kafka-topics.sh --describe --zookeeper mini01:2181 --topic order
4 topic:order    partitioncount:4    replicationfactor:1    configs:
5     topic: order    partition: 0    leader: 0    replicas: 0    isr: 0
6     topic: order    partition: 1    leader: 1    replicas: 1    isr: 1
7     topic: order    partition: 2    leader: 2    replicas: 2    isr: 2
8     topic: order    partition: 3    leader: 0    replicas: 0    isr: 0

 

       要求:topic order 的副本数由1变为2, 之前每个分区在哪台机器上在上面已给出。

       说明:part 0分布在群集0,1; part 1分布在集群1,2;part 2 分布在集群2,0;part 3分布在集群0,1。

 

11.1. 创建一个重新调整计划文件

 1 [yun@mini01 kafka_20180916]$ cat increase-replication-factor.json
 2 {
 3   "version":1,
 4   "partitions":[
 5     {"topic": "order","partition": 0,"replicas": [0,1]},
 6     {"topic": "order","partition": 1,"replicas": [1,2]},
 7     {"topic": "order","partition": 2,"replicas": [2,0]},
 8     {"topic": "order","partition": 3,"replicas": [0,1]}
 9   ]
10 }

 

11.2. 语句执行

1 [yun@mini01 kafka_20180916]$ kafka-reassign-partitions.sh --zookeeper mini01:2181 --reassignment-json-file increase-replication-factor.json --execute 
2 current partition replica assignment
3 
4 {"version":1,"partitions":[{"topic":"order","partition":2,"replicas":[2],"log_dirs":["any"]},{"topic":"order","partition":1,"replicas":[1],"log_dirs":["any"]},{"topic":"order","partition":3,"replicas":[0],"log_dirs":["any"]},{"topic":"order","partition":0,"replicas":[0],"log_dirs":["any"]}]}
5 
6 save this to use as the --reassignment-json-file option during rollback
7 successfully started reassignment of partitions.

 

11.3. 查看是否执行成功

1 [yun@mini01 kafka_20180916]$ kafka-reassign-partitions.sh --zookeeper mini01:2181 --reassignment-json-file increase-replication-factor.json --verify
2 status of partition reassignment: 
3 reassignment of partition order-0 completed successfully
4 reassignment of partition order-1 completed successfully
5 reassignment of partition order-2 completed successfully
6 reassignment of partition order-3 completed successfully

  

11.4. 再次查看该topic详情

1 [yun@mini01 kafka_20180916]$ kafka-topics.sh --describe --zookeeper mini01:2181 --topic order  # 由下可见分配成功
2 topic:order    partitioncount:4    replicationfactor:2    configs:
3     topic: order    partition: 0    leader: 0    replicas: 0,1    isr: 0,1
4     topic: order    partition: 1    leader: 1    replicas: 1,2    isr: 1,2
5     topic: order    partition: 2    leader: 2    replicas: 2,0    isr: 2,0
6     topic: order    partition: 3    leader: 0    replicas: 0,1    isr: 0,1

 

 

12. 创建partitions时在broker的分配机制

1 kafka-topics.sh --create --zookeeper mini01:2181 --replication-factor 1 --partitions 5 --topic test01
2 kafka-topics.sh --create --zookeeper mini01:2181 --replication-factor 1 --partitions 11 --topic test02

 

注意在各机器上partitions的分布

 1 mini01
 2     test01-0
 3     test01-3
 4     test02-2
 5     test02-5
 6     test02-8
 7 
 8 mini02
 9     test01-1
10     test01-4
11     test02-0
12     test02-3
13     test02-6
14     test02-9
15 
16 mini03
17     test01-2
18     test02-1
19     test02-10
20     test02-4
21     test02-7