史上最详细 Kafka 命令行操作
程序员文章站
2022-06-14 09:47:52
...
史上最详细 Kafka 命令行操作
先启动 zookeeper
[xiao @hadoop102 ~]$ zk.sh start
===================== hadoop102 =======================
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
===================== hadoop103 =======================
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
===================== hadoop104 =======================
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
再启动 kafka
[xiao @hadoop102 ~]$ kafka.sh start
==================START hadoop102 KAFKA===================
==================START hadoop103 KAFKA===================
==================START hadoop104 KAFKA===================
[xiao @hadoop102 ~]$ jpsall
=============== hadoop102 ===============
2452 Kafka
1941 QuorumPeerMain
=============== hadoop103 ===============
2388 Kafka
1882 QuorumPeerMain
=============== hadoop104 ===============
1883 QuorumPeerMain
2382 Kafka
1)查看当前服务器中的所有topic
[xiao @hadoop102 ~]$ kafka-topics.sh --list --zookeeper hadoop102:2181
或
[xiao @hadoop102 ~]$ kafka-topics.sh --list --bootstrap-server hadoop102:9092
2)创建topic
第一步:尝试输入
[xiao @hadoop102 ~]$ kafka-topics.sh
Create, delete, describe, or change a topic.
Option Description
------ -----------
--alter Alter the number of partitions,
replica assignment, and/or
configuration for the topic.
--at-min-isr-partitions if set when describing topics, only
show partitions whose isr count is
equal to the configured minimum. Not
supported with the --zookeeper
option.
--bootstrap-server <String: server to REQUIRED: The Kafka server to connect
connect to> to. In case of providing this, a
direct Zookeeper connection won't be
required.
--command-config <String: command Property file containing configs to be
config property file> passed to Admin Client. This is used
only with --bootstrap-server option
for describing and altering broker
configs.
--config <String: name=value> A topic configuration override for the
topic being created or altered.The
following is a list of valid
configurations:
cleanup.policy
compression.type
delete.retention.ms
file.delete.delay.ms
flush.messages
flush.ms
follower.replication.throttled.
replicas
index.interval.bytes
leader.replication.throttled.replicas
max.compaction.lag.ms
max.message.bytes
message.downconversion.enable
message.format.version
message.timestamp.difference.max.ms
message.timestamp.type
min.cleanable.dirty.ratio
min.compaction.lag.ms
min.insync.replicas
preallocate
retention.bytes
retention.ms
segment.bytes
segment.index.bytes
segment.jitter.ms
segment.ms
unclean.leader.election.enable
See the Kafka documentation for full
details on the topic configs.It is
supported only in combination with --
create if --bootstrap-server option
is used.
--create Create a new topic.
--delete Delete a topic
--delete-config <String: name> A topic configuration override to be
removed for an existing topic (see
the list of configurations under the
--config option). Not supported with
the --bootstrap-server option.
--describe List details for the given topics.
--disable-rack-aware Disable rack aware replica assignment
--exclude-internal exclude internal topics when running
list or describe command. The
internal topics will be listed by
default
--force Suppress console prompts
--help Print usage information.
--if-exists if set when altering or deleting or
describing topics, the action will
only execute if the topic exists.
Not supported with the --bootstrap-
server option.
--if-not-exists if set when creating topics, the
action will only execute if the
topic does not already exist. Not
supported with the --bootstrap-
server option.
--list List all available topics.
--partitions <Integer: # of partitions> The number of partitions for the topic
being created or altered (WARNING:
If partitions are increased for a
topic that has a key, the partition
logic or ordering of the messages
will be affected). If not supplied
for create, defaults to the cluster
default.
--replica-assignment <String: A list of manual partition-to-broker
broker_id_for_part1_replica1 : assignments for the topic being
broker_id_for_part1_replica2 , created or altered.
broker_id_for_part2_replica1 :
broker_id_for_part2_replica2 , ...>
--replication-factor <Integer: The replication factor for each
replication factor> partition in the topic being
created. If not supplied, defaults
to the cluster default.
--topic <String: topic> The topic to create, alter, describe
or delete. It also accepts a regular
expression, except for --create
option. Put topic name in double
quotes and use the '\' prefix to
escape regular expression symbols; e.
g. "test\.topic".
--topics-with-overrides if set when describing topics, only
show topics that have overridden
configs
--unavailable-partitions if set when describing topics, only
show partitions whose leader is not
available
--under-min-isr-partitions if set when describing topics, only
show partitions whose isr count is
less than the configured minimum.
Not supported with the --zookeeper
option.
--under-replicated-partitions if set when describing topics, only
show under replicated partitions
--version Display Kafka version.
--zookeeper <String: hosts> DEPRECATED, The connection string for
the zookeeper connection in the form
host:port. Multiple hosts can be
given to allow fail-over.
第二步:尝试加上 --list
[xiao @hadoop102 ~]$ kafka-topics.sh --list
Exception in thread "main" java.lang.IllegalArgumentException: Only one of --bootstrap-server or --zookeeper must be specified
at kafka.admin.TopicCommand$TopicCommandOptions.checkArgs(TopicCommand.scala:666)
at kafka.admin.TopicCommand$.main(TopicCommand.scala:51)
at kafka.admin.TopicCommand.main(TopicCommand.scala)
第三步:尝试加上 --bootstrap-server 或者 --zookeeper
[xiao @hadoop102 ~]$ kafka-topics.sh --list -zookeeper hadoop102:2181
[xiao @hadoop102 ~]$ kafka-topics.sh --list --bootstrap-server hadoop102:9092
均未成功创建
第四步:尝试使用 --create 方法
[xiao @hadoop102 ~]$ kafka-topics.sh --create -zookeeper hadoop102:2181
Missing required argument "[topic]"
第五步:尝试加上 --topic
[xiao @hadoop102 ~]$ kafka-topics.sh --create --zookeeper hadoop102:2181 --topic first
Missing required argument "[partitions]"
第六步:尝试加上 --partitions
[xiao @hadoop102 ~]$ kafka-topics.sh --create --zookeeper hadoop102:2181 --topic first --partitions 2
Missing required argument "[replication-factor]"
第七步:尝试加上 --replication-factor
[xiao @hadoop102 ~]$ kafka-topics.sh --create --zookeeper hadoop102:2181 --topic first --partitions 2 --replication-factor 3
Created topic first.
第八步:再次查看当前服务器中的所有topic
[xiao @hadoop102 ~]$ kafka-topics.sh --list --bootstrap-server hadoop102:9092
first
选项说明:
–topic 定义topic名
–replication-factor 定义副本数
–partitions 定义分区数
3)查看某个 Topic 的详情
[xiao @hadoop102 ~]$ kafka-topics.sh --describe --topic first --bootstrap-server hadoop102:9092
Topic: first PartitionCount: 2 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: first Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: first Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
4)修改分区数(只允许往大的方向改)
已有两个分区,尝试往小的方向修改,修改分区数为 1
[xiao @hadoop102 ~]$ kafka-topics.sh --alter --topic first --bootstrap-server hadoop102:9092 --partitions 1
Error while executing topic command : org.apache.kafka.common.errors.InvalidPartitionsException: Topic currently has 2 partitions, which is higher than the requested 1.
报错;尝试往大的方向修改,修改分区数为 3
[xiao @hadoop102 ~]$ kafka-topics.sh --alter --topic first --bootstrap-server hadoop102:9092 --partitions 3
修改之后再次查询此 topic 的详情
[xiao @hadoop102 ~]$ kafka-topics.sh --describe --topic first --bootstrap-server hadoop102:9092
Topic: first PartitionCount: 3 ReplicationFactor: 3 Configs: segment.bytes=1073741824
Topic: first Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
Topic: first Partition: 1 Leader: 1 Replicas: 1,2,0 Isr: 1,2,0
Topic: first Partition: 2 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
可以看到,修改后的分区数变为了 3 个(原 topic 分区数为 2 )
5)删除 topic
[xiao @hadoop102 ~]$ kafka-topics.sh --delete --topic first --bootstrap-server hadoop102:9092
Topic first is marked for deletion.
Note: This will have no impact if delete.topic.enable is not set to true.
6)生产者发送消息
//查看方法介绍
[xiao @hadoop102 ~]$ kafka-console-producer.sh
This tool helps to read data from standard input and publish it to Kafka.
//实现方法
[xiao @hadoop102 ~]$ kafka-console-producer.sh --broker-list hadoop102:9092 --topic first
>
>hello
>nihao
>123456
>
>a
>b
>c
7)消费者消费消息
//offset重置引发的,只能消费到打开之后的生产者的消息
[xiao @hadoop102 ~]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic first
a
b
c
//可以获取生产者所产生的所有消息
[xiao @hadoop102 ~]$ kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --from-beginning --topic first
hello
123456
b
a
c
nihao
//可以看见:此处的消息和我们生产者的消息的顺序是不同的,是因为我们有3个区,具体的消费顺序是不确定的;但是,如果我们只有一个分区,那么这个分区的消费信息一定是有序的,因为涉及到队列问题,一定是有序的!
下一篇: sqlserver交换数据行中的指定列