手动重置/设置 Consumer Group 的Offsets
最近项目出现问题,产生了很多无用的topic消息到kafka集群,需要手动设置某个topic的consumer的Offset。
网上查了查资料,这里记录一下。
kafka常用命令
1)、创建topic:
./bin/kafka-topics.sh
--create
--zookeeper localhost:2181
--replication-factor 1
--partitions 1
--topic test
2)、查看topic:
./bin/kafka-topics.sh --list --zookeeper localhost:2181
3)、生产者
./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
4)、消费者
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic
test --from-beginning
5)、删除topic
./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test
备注:[server.properties需要 设置delete.topic.enable=true]
#列出所有group-id
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list
#查询group-id的消费情况
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group course-cell-dinc-test
#删除指定的group-id
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --delete --group course-cell-dinc-test
重置group的消费记录
当选择重置消费记录操作时,目标Group的状态一定不能是活跃的。也就是该group中不能有consumer在消费。
通过 --reset-offsets 可以重置指定group的消费记录。和–reset-offsets搭配的有两个选项,–dry-run和–execute,默认是–dry-run。
dry-run 模式
当运行在–dry-run模式下,重置操作不会真正的执行,只会预演重置offset的结果。
该模式也是为了让用户谨慎的操作,否则直接重置消费记录会造成各个consumer消息读取的异常。
#--shift-by -1 表示将消费的offset重置成当前消费的offset-1
kafka-consumer-groups
--bootstrap-server localhost:9092
--reset-offsets
--shift-by -1
--topic global-biz-log
--group course-cell-dinc-test
--dry-run
输出
TOPIC PARTITION NEW-OFFSET
test 0 797054
此时如果去查询该group的消费offset,会发现该group的消费offset其实还是797055,并没有发生改变。
—execute 模式
通过–execute参数可以直接执行重置操作。
kafka-consumer-groups
--bootstrap-server localhost:9092
--reset-offsets
--shift-by -1
--topic global-biz-log
--group course-cell-dinc-test
--execute
重置offset的几种策略
该命令提供了多种offset重置策略给我们选择如何重置offset
--to-current 直接重置offset到当前的offset,也就是LOE
--to-datetime <String: datetime> 重置offset到指定时间的offset处
--to-earliest 重置offset到最开始的那条offset
--to-offset <Long: offset> 重置offset到目标的offset
--shift-by <Long:n> 根据当前的offset进行重置,n可以是正负数
--from-file <String: path to CSV file> 通过外部的csv文件描述来进行重置
./kafka-consumer-groups.sh
--bootstrap-server localhost:9092
--reset-offsets
--to-earliest
--topic global-biz-log
--group debug-01
--dry-run
./kafka-consumer-groups.sh
--bootstrap-server localhost:9092
--reset-offsets
--to-earliest
--topic global-biz-log
--group debug-01
--execute
./kafka-consumer-groups.sh
--bootstrap-server localhost:9092
--reset-offsets
--to-datetime 2019-11-22T06:00:00.000
--topic global-biz-log
--group debug-01
--execute
使用kafka-verifiable-consumer批量拉取消息
kafka-verifiable-consumer可以批量的拉取消息,其实和kafka-console-consumer命令差不多。
不过使用kafka-verifiable-consumer消费消息输出的内容更丰富,还包括offset等信息,并且可以设置只读取几条消息等。
kafka-console-consumer是有多少读多少。
#–max-messages 5 表示只拉取5条
#–verbose 表示输出每一条消息的内容
./kafka-verifiable-consumer.sh
--broker-list localhost:9092
--max-messages 5
--group-id course-cell-dinc-test
--topic global-biz-log
--verbose
./kafka-verifiable-consumer.sh
--broker-list localhost:9092
--max-messages 1
--reset-policy earliest
--group-id course-cell-dinc-test
--topic global-biz-log
--v
kafka-verifiable-consumer命令还支持以下参数:
--session-timeout consumer的超时时间
--enable-autocommit 是否开启自动offset提交,默认是false
--reset-policy 当以前没有消费记录时,选择要拉取offset的策略,可以是'earliest', 'latest','none'。默认是earliest
--assignment-strategy consumer分配分区策略,默认是RoundRobinAssignor
--consumer.config 指定consumer的配置
Using kafka-consumer-groups.sh
With the setup done, you should be able to list the currently active groups with:
kafka-consumer-groups.sh \
--bootstrap-server demo-kafka.htn-aiven-demo.aivencloud.com:17072 \
--command-config consumer.properties \
--list
To view details for a single group:
kafka-consumer-groups.sh \
--bootstrap-server demo-kafka.htn-aiven-demo.aivencloud.com:17072 \
--command-config consumer.properties \
--group test-group \
--describe
To list current members of a group:
kafka-consumer-groups.sh \
--bootstrap-server demo-kafka.htn-aiven-demo.aivencloud.com:17072 \
--command-config consumer.properties \
--group test-group \
--describe \
--members
Resetting consumer group offset
To reset an offset for a group, you can issue:
kafka-consumer-groups.sh \
--bootstrap-server demo-kafka.htn-aiven-demo.aivencloud.com:17072 \
--command-config consumer.properties \
--group test-group \
--topic test-topic \
--reset-offsets \
--to-earliest \
--execute
You can reset the offset either to the beginning of the data with --to-earliest
, to the end of the topic with --to-lastest
. If you know a fixed offset, you can use --to-offset
directly.
Relative changes are possible with --shift-by
argument. You can perform either forward skips with positive numbers or backwards move with negative value.
You can reset offsets based on timestamp by using --to-datetime <YYYY-MM-DDTHH:mm:SS.sss>
selector.
--topic
by default covers all partitions. You can use format --topic <topicname>:<partition>
to address a specific partition, e.g. --topic test-topic:0
.
Please do note that the consumer group must be inactive when offset adjustments are issued.
example1
Since kafka 0.11.0.0 you can use the script kafka-consumer-groups.sh
Example from this answer
kafka-consumer-groups.sh
--bootstrap-server kafka-host:9092
--group my-group
--reset-offsets
--to-earliest
--all-topics
--execute
Other options listed in the KIP-122: Add Reset Consumer Group Offsets tooling
.----------------------.-----------------------------------------------.----------------------------------------------------------------------------------------------------------------------------------------------.
| Scenario | Arguments | Example |
:----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
| Reset to Datetime | --to-datetime YYYY-MM-DDTHH:mm:SS.sss±hh:mm | Reset to first offset since 01 January 2017, 00:00:00 hrs: --reset-offsets –group test.group --topic foo --to-datetime 2017-01-01T00:00:00Z |
:----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
| Reset by Duration | --by-duration PnDTnHnMnS | Reset to first offset since one week ago (from current timestamp): --reset-offsets --group test.group --topic foo --by-duration P7D |
:----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
| Reset to Earliest | --to-earliest | Reset to earliest offset available: --reset-offsets --group test.group --topic foo --to-earliest |
:----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
| Reset to Latest | --to-latest | Reset to latest offset available: --reset-offsets --group test.group --topic foo --to-latest |
:----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
| Reset to Offset | --to-offset | Reset to offset 1 in all partitions: --reset-offsets --group test.group --topic foo --to-offset 1 |
:----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
| Shift Offset by 'n' | --shift-by n | Reset to current offset plus 5 positions: --reset-offsets --group test.group –topic foo --shift-by 5 |
:----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
| Reset from File | --from-file PATH_TO_FILE | Reset using a file with reset plan: --reset-offsets --group test.group --from-file reset-plan.csv |
'----------------------'-----------------------------------------------'----------------------------------------------------------------------------------------------------------------------------------------------'
You can also define the partition you want to reset, example:
-
Reset offset of topic foo partition 0 to 1
--reset-offsets --group test.group --topic foo:0 --to-offset 1
-
Reset offset of topic foo partition 0,1,2 to earliest
--reset-offsets --group test.group --topic foo:0,1,2 --to-earliest
Reminder: don't forget the --execute
flag (see the execution options in the KIP). Without this flag the script will only print out the result of the scenario by scope, for example:
TOPIC PARTITION NEW-OFFSET NEW-LAG LOG-END-OFFSET CONSUMER-ID HOST CLIENT-ID
foo 0 90 10 100 - - -
Credits to this answer. Table created with ascii tables
example2
can use the --export
command of kafka-consumer-groups
to create the CSV file from existing information, without changing anything via --dry-run
. For example:
bin/kafka-consumer-groups \
--bootstrap-server $KAFKA \
--export --group $GROUP_NAME --topic $TOPIC \
--reset-offsets --to-current \
--dry-run
The value --to-current
can be changed to various other values, such as --to-datetime
, --by-period
, etc.
The output of that command is the required CSV file necessary for --from-file
.
One very useful use case for this is to copy the offsets from one consumer group to another consumer group, for example:
bin/kafka-consumer-groups \
--bootstrap-server $KAFKA \
--export --group $FROM_GROUP_NAME --topic $TOPIC \
--reset-offsets --to-current \
--dry-run > offsets.txt
bin/kafka-consumer-groups \
--bootstrap-server $KAFKA \
--execute --group $TO_GROUP_NAME \
--reset-offsets --from-file offsets.txt
参考
https://blog.csdn.net/qq_30868737/article/details/103597073
https://help.aiven.io/en/articles/2661525-viewing-and-resetting-consumer-group-offsets
https://*.com/questions/29791268/how-to-change-start-offset-for-topic
https://*.com/questions/55477753/how-to-reset-offsets-to-arbitrary-value-in-kafka-consumer-group