欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

史上最详细 Kafka 命令行操作

程序员文章站 2022-06-14 09:47:52
...

史上最详细 Kafka 命令行操作

先启动 zookeeper

[xiao @hadoop102 ~]$ zk.sh start
=====================  hadoop102  =======================
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
=====================  hadoop103  =======================
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
=====================  hadoop104  =======================
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

再启动 kafka

[xiao @hadoop102 ~]$ kafka.sh start
==================START hadoop102 KAFKA===================
==================START hadoop103 KAFKA===================
==================START hadoop104 KAFKA===================
[xiao @hadoop102 ~]$ jpsall
=============== hadoop102 ===============
2452 Kafka
1941 QuorumPeerMain
=============== hadoop103 ===============
2388 Kafka
1882 QuorumPeerMain
=============== hadoop104 ===============
1883 QuorumPeerMain
2382 Kafka

1)查看当前服务器中的所有topic

[xiao @hadoop102 ~]$ kafka-topics.sh --list --zookeeper hadoop102:2181

[xiao @hadoop102 ~]$ kafka-topics.sh --list --bootstrap-server hadoop102:9092

2)创建topic

第一步:尝试输入

[xiao @hadoop102 ~]$ kafka-topics.sh
Create, delete, describe, or change a topic.
Option                                   Description                            
------                                   ----------- 
 --alter                                  Alter the number of partitions,        
                                           replica assignment, and/or           
                                           configuration for the topic.         
--at-min-isr-partitions                  if set when describing topics, only    
                                           show partitions whose isr count is   
                                           equal to the configured minimum. Not 
                                           supported with the --zookeeper       
                                           option.                              
--bootstrap-server <String: server to    REQUIRED: The Kafka server to connect  
  connect to>                              to. In case of providing this, a     
                                           direct Zookeeper connection won't be 
                                           required.                            
--command-config <String: command        Property file containing configs to be 
  config property file>                    passed to Admin Client. This is used 
                                           only with --bootstrap-server option  
                                           for describing and altering broker   
                                           configs.                             
--config <String: name=value>            A topic configuration override for the 
                                           topic being created or altered.The   
                                           following is a list of valid         
                                           configurations:                      
                                         	cleanup.policy                        
                                         	compression.type                      
                                         	delete.retention.ms                   
                                         	file.delete.delay.ms                  
                                         	flush.messages                        
                                         	flush.ms                              
                                         	follower.replication.throttled.       
                                           replicas                             
                                         	index.interval.bytes                  
                                         	leader.replication.throttled.replicas 
                                         	max.compaction.lag.ms                 
                                         	max.message.bytes                     
                                         	message.downconversion.enable         
                                         	message.format.version                
                                         	message.timestamp.difference.max.ms   
                                         	message.timestamp.type                
                                         	min.cleanable.dirty.ratio             
                                         	min.compaction.lag.ms                 
                                         	min.insync.replicas                   
                                         	preallocate                           
                                         	retention.bytes                       
                                         	retention.ms                          
                                         	segment.bytes                         
                                         	segment.index.bytes                   
                                         	segment.jitter.ms                     
                                         	segment.ms                            
                                         	unclean.leader.election.enable        
                                         See the Kafka documentation for full   
                                           details on the topic configs.It is   
                                           supported only in combination with --
                                           create if --bootstrap-server option  
                                           is used.                             
--create                                 Create a new topic.                    
--delete                                 Delete a topic                         
--delete-config <String: name>           A topic configuration override to be   
                                           removed for an existing topic (see   
                                           the list of configurations under the 
                                           --config option). Not supported with 
                                           the --bootstrap-server option.       
--describe                               List details for the given topics.     
--disable-rack-aware                     Disable rack aware replica assignment  
--exclude-internal                       exclude internal topics when running   
                                           list or describe command. The        
                                           internal topics will be listed by    
                                           default                              
--force                                  Suppress console prompts               
--help                                   Print usage information.               
--if-exists                              if set when altering or deleting or    
                                           describing topics, the action will   
                                           only execute if the topic exists.    
                                           Not supported with the --bootstrap-  
                                           server option.                       
--if-not-exists                          if set when creating topics, the       
                                           action will only execute if the      
                                           topic does not already exist. Not    
                                           supported with the --bootstrap-      
                                           server option.                       
--list                                   List all available topics.             
--partitions <Integer: # of partitions>  The number of partitions for the topic 
                                           being created or altered (WARNING:   
                                           If partitions are increased for a    
                                           topic that has a key, the partition  
                                           logic or ordering of the messages    
                                           will be affected). If not supplied   
                                           for create, defaults to the cluster  
                                           default.                             
--replica-assignment <String:            A list of manual partition-to-broker   
  broker_id_for_part1_replica1 :           assignments for the topic being      
  broker_id_for_part1_replica2 ,           created or altered.                  
  broker_id_for_part2_replica1 :                                                
  broker_id_for_part2_replica2 , ...>                                           
--replication-factor <Integer:           The replication factor for each        
  replication factor>                      partition in the topic being         
                                           created. If not supplied, defaults   
                                           to the cluster default.              
--topic <String: topic>                  The topic to create, alter, describe   
                                           or delete. It also accepts a regular 
                                           expression, except for --create      
                                           option. Put topic name in double     
                                           quotes and use the '\' prefix to     
                                           escape regular expression symbols; e.
                                           g. "test\.topic".                    
--topics-with-overrides                  if set when describing topics, only    
                                           show topics that have overridden     
                                           configs                              
--unavailable-partitions                 if set when describing topics, only    
                                           show partitions whose leader is not  
                                           available                            
--under-min-isr-partitions               if set when describing topics, only    
                                           show partitions whose isr count is   
                                           less than the configured minimum.    
                                           Not supported with the --zookeeper   
                                           option.                              
--under-replicated-partitions            if set when describing topics, only    
                                           show under replicated partitions     
--version                                Display Kafka version.                 
--zookeeper <String: hosts>              DEPRECATED, The connection string for  
                                           the zookeeper connection in the form 
                                           host:port. Multiple hosts can be     
                                           given to allow fail-over.          

第二步:尝试加上 --list

[xiao @hadoop102 ~]$ kafka-topics.sh --list
Exception in thread "main" java.lang.IllegalArgumentException: Only one of --bootstrap-server or --zookeeper must be specified
	at kafka.admin.TopicCommand$TopicCommandOptions.checkArgs(TopicCommand.scala:666)
	at kafka.admin.TopicCommand$.main(TopicCommand.scala:51)
	at kafka.admin.TopicCommand.main(TopicCommand.scala)

第三步:尝试加上 --bootstrap-server 或者 --zookeeper

[xiao @hadoop102 ~]$ kafka-topics.sh --list -zookeeper hadoop102:2181
[xiao @hadoop102 ~]$ kafka-topics.sh --list --bootstrap-server  hadoop102:9092
    

​ 均未成功创建

第四步:尝试使用 --create 方法

[xiao @hadoop102 ~]$ kafka-topics.sh --create -zookeeper hadoop102:2181
Missing required argument "[topic]"

第五步:尝试加上 --topic

[xiao @hadoop102 ~]$ kafka-topics.sh --create --zookeeper hadoop102:2181 --topic first
Missing required argument "[partitions]"

第六步:尝试加上 --partitions

[xiao @hadoop102 ~]$ kafka-topics.sh --create --zookeeper hadoop102:2181 --topic first --partitions 2
Missing required argument "[replication-factor]"

第七步:尝试加上 --replication-factor

[xiao @hadoop102 ~]$ kafka-topics.sh --create --zookeeper hadoop102:2181 --topic first --partitions 2 --replication-factor 3
Created topic first.

第八步:再次查看当前服务器中的所有topic

[xiao @hadoop102 ~]$ kafka-topics.sh --list --bootstrap-server hadoop102:9092
first

选项说明

–topic 定义topic名

–replication-factor 定义副本数

–partitions 定义分区数

3)查看某个 Topic 的详情

[xiao @hadoop102 ~]$ kafka-topics.sh --describe --topic first --bootstrap-server hadoop102:9092
Topic: first	PartitionCount: 2	ReplicationFactor: 3	Configs: segment.bytes=1073741824
	Topic: first	Partition: 0	Leader: 0	Replicas: 0,1,2	Isr: 0,1,2
	Topic: first	Partition: 1	Leader: 1	Replicas: 1,2,0	Isr: 1,2,0

4)修改分区数(只允许往大的方向改)

​ 已有两个分区,尝试往小的方向修改,修改分区数为 1

[xiao @hadoop102 ~]$ kafka-topics.sh --alter --topic first --bootstrap-server hadoop102:9092 --partitions 1
Error while executing topic command : org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 2 partitions, which is higher than the requested 1.

​ 报错;尝试往大的方向修改,修改分区数为 3

[xiao @hadoop102 ~]$ kafka-topics.sh --alter --topic first --bootstrap-server hadoop102:9092 --partitions 3

​ 修改之后再次查询此 topic 的详情

[xiao @hadoop102 ~]$ kafka-topics.sh --describe --topic first --bootstrap-server hadoop102:9092
Topic: first	PartitionCount: 3	ReplicationFactor: 3	Configs: segment.bytes=1073741824
	Topic: first	Partition: 0	Leader: 0	Replicas: 0,1,2	Isr: 0,1,2
	Topic: first	Partition: 1	Leader: 1	Replicas: 1,2,0	Isr: 1,2,0
	Topic: first	Partition: 2	Leader: 2	Replicas: 2,0,1	Isr: 2,0,1

​ 可以看到,修改后的分区数变为了 3 个(原 topic 分区数为 2 )

5)删除 topic

[xiao @hadoop102 ~]$ kafka-topics.sh --delete --topic first --bootstrap-server hadoop102:9092
Topic first is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.

6)生产者发送消息

//查看方法介绍
[xiao @hadoop102 ~]$ kafka-console-producer.sh
This tool helps to read data from standard input and publish it to Kafka.
//实现方法
[xiao @hadoop102 ~]$ kafka-console-producer.sh --broker-list hadoop102:9092 --topic first
>
>hello
>nihao
>123456
>
>a
>b
>c

7)消费者消费消息

//offset重置引发的,只能消费到打开之后的生产者的消息
[xiao @hadoop102 ~]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
a
b
c

//可以获取生产者所产生的所有消息
[xiao @hadoop102 ~]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first
hello
123456
b
a
c
nihao
//可以看见:此处的消息和我们生产者的消息的顺序是不同的,是因为我们有3个区,具体的消费顺序是不确定的;但是,如果我们只有一个分区,那么这个分区的消费信息一定是有序的,因为涉及到队列问题,一定是有序的!
相关标签: kafka kafka