欢迎您访问程序员文章站本站旨在为大家提供分享程序员计算机编程知识!
您现在的位置是: 首页

手动重置/设置 Consumer Group 的Offsets

程序员文章站 2022-06-08 18:11:43
...

最近项目出现问题,产生了很多无用的topic消息到kafka集群,需要手动设置某个topic的consumer的Offset。

网上查了查资料,这里记录一下。

 

kafka常用命令

1)、创建topic:

./bin/kafka-topics.sh 
    --create 
    --zookeeper localhost:2181 
    --replication-factor 1 
    --partitions 1 
    --topic test

2)、查看topic:

./bin/kafka-topics.sh --list --zookeeper localhost:2181

3)、生产者

./bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

4)、消费者

  ./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic
 test --from-beginning

5)、删除topic

./bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic test

备注:[server.properties需要 设置delete.topic.enable=true]

#列出所有group-id

./kafka-consumer-groups.sh --bootstrap-server localhost:9092  --list
#查询group-id的消费情况
./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group course-cell-dinc-test
#删除指定的group-id
./kafka-consumer-groups.sh --bootstrap-server localhost:9092  --delete --group course-cell-dinc-test

重置group的消费记录

      当选择重置消费记录操作时,目标Group的状态一定不能是活跃的。也就是该group中不能有consumer在消费。
通过 --reset-offsets 可以重置指定group的消费记录。和–reset-offsets搭配的有两个选项,–dry-run和–execute,默认是–dry-run。

dry-run 模式

      当运行在–dry-run模式下,重置操作不会真正的执行,只会预演重置offset的结果。
      该模式也是为了让用户谨慎的操作,否则直接重置消费记录会造成各个consumer消息读取的异常。

#--shift-by -1 表示将消费的offset重置成当前消费的offset-1
kafka-consumer-groups 
    --bootstrap-server localhost:9092 
    --reset-offsets 
    --shift-by -1 
    --topic global-biz-log 
    --group course-cell-dinc-test 
    --dry-run

输出

TOPIC                          PARTITION  NEW-OFFSET
test                           0          797054

    此时如果去查询该group的消费offset,会发现该group的消费offset其实还是797055,并没有发生改变。

—execute 模式

    通过–execute参数可以直接执行重置操作。

kafka-consumer-groups 
    --bootstrap-server localhost:9092 
    --reset-offsets 
    --shift-by -1 
    --topic global-biz-log 
    --group course-cell-dinc-test 
    --execute

重置offset的几种策略

该命令提供了多种offset重置策略给我们选择如何重置offset

--to-current 直接重置offset到当前的offset,也就是LOE
--to-datetime <String: datetime>  重置offset到指定时间的offset处
--to-earliest  重置offset到最开始的那条offset
--to-offset <Long: offset> 重置offset到目标的offset
--shift-by <Long:n> 根据当前的offset进行重置,n可以是正负数
--from-file <String: path to CSV file> 通过外部的csv文件描述来进行重置
./kafka-consumer-groups.sh 
    --bootstrap-server localhost:9092 
    --reset-offsets 
    --to-earliest 
    --topic global-biz-log 
    --group debug-01 
    --dry-run


./kafka-consumer-groups.sh 
    --bootstrap-server localhost:9092 
    --reset-offsets 
    --to-earliest 
    --topic global-biz-log 
    --group debug-01 
    --execute


./kafka-consumer-groups.sh 
    --bootstrap-server localhost:9092 
    --reset-offsets 
    --to-datetime  2019-11-22T06:00:00.000 
    --topic global-biz-log 
    --group debug-01 
    --execute

使用kafka-verifiable-consumer批量拉取消息

      kafka-verifiable-consumer可以批量的拉取消息,其实和kafka-console-consumer命令差不多。
      不过使用kafka-verifiable-consumer消费消息输出的内容更丰富,还包括offset等信息,并且可以设置只读取几条消息等。
      kafka-console-consumer是有多少读多少。
#–max-messages 5 表示只拉取5条
#–verbose 表示输出每一条消息的内容

./kafka-verifiable-consumer.sh 
    --broker-list localhost:9092 
    --max-messages 5 
    --group-id course-cell-dinc-test 
    --topic global-biz-log 
    --verbose


./kafka-verifiable-consumer.sh 
    --broker-list localhost:9092 
    --max-messages 1 
    --reset-policy earliest 
    --group-id course-cell-dinc-test 
    --topic global-biz-log 
    --v

kafka-verifiable-consumer命令还支持以下参数:

--session-timeout consumer的超时时间
--enable-autocommit 是否开启自动offset提交,默认是false
--reset-policy  当以前没有消费记录时,选择要拉取offset的策略,可以是'earliest', 'latest','none'。默认是earliest
--assignment-strategy  consumer分配分区策略,默认是RoundRobinAssignor
--consumer.config 指定consumer的配置

 

Using kafka-consumer-groups.sh

With the setup done, you should be able to list the currently active groups with:

kafka-consumer-groups.sh \
    --bootstrap-server demo-kafka.htn-aiven-demo.aivencloud.com:17072 \
    --command-config consumer.properties \
    --list

To view details for a single group:

kafka-consumer-groups.sh \
    --bootstrap-server demo-kafka.htn-aiven-demo.aivencloud.com:17072 \
    --command-config consumer.properties \
    --group test-group \
    --describe

To list current members of a group:

kafka-consumer-groups.sh \
    --bootstrap-server demo-kafka.htn-aiven-demo.aivencloud.com:17072 \
    --command-config consumer.properties \
    --group test-group \
    --describe \
    --members

Resetting consumer group offset

To reset an offset for a group, you can issue:

kafka-consumer-groups.sh \
    --bootstrap-server demo-kafka.htn-aiven-demo.aivencloud.com:17072 \
    --command-config consumer.properties \
    --group test-group \
    --topic test-topic \
    --reset-offsets \
    --to-earliest \
    --execute

You can reset the offset either to the beginning of the data with --to-earliest, to the end of the topic with --to-lastest. If you know a fixed offset, you can use --to-offset directly.

Relative changes are possible with --shift-by argument. You can perform either forward skips with positive numbers or backwards move with negative value.

You can reset offsets based on timestamp by using --to-datetime <YYYY-MM-DDTHH:mm:SS.sss> selector.

--topic by default covers all partitions. You can use format --topic <topicname>:<partition> to address a specific partition, e.g. --topic test-topic:0.

Please do note that the consumer group must be inactive when offset adjustments are issued.

 

example1

Since kafka 0.11.0.0 you can use the script kafka-consumer-groups.sh Example from this answer

kafka-consumer-groups.sh 
    --bootstrap-server kafka-host:9092 
    --group my-group 
    --reset-offsets 
    --to-earliest 
    --all-topics 
    --execute

Other options listed in the KIP-122: Add Reset Consumer Group Offsets tooling

.----------------------.-----------------------------------------------.----------------------------------------------------------------------------------------------------------------------------------------------.
|      Scenario        |                   Arguments                   |                                                                    Example                                                                   |
:----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
| Reset to Datetime    |  --to-datetime YYYY-MM-DDTHH:mm:SS.sss±hh:mm  |  Reset to first offset since 01 January 2017, 00:00:00 hrs: --reset-offsets –group test.group --topic foo --to-datetime 2017-01-01T00:00:00Z |
:----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
| Reset by Duration    |  --by-duration  PnDTnHnMnS                    |  Reset to first offset since one week ago (from current timestamp): --reset-offsets --group test.group --topic foo --by-duration P7D         |
:----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
| Reset to Earliest    |  --to-earliest                                |  Reset to earliest offset available: --reset-offsets --group test.group --topic foo --to-earliest                                            |
:----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
| Reset to Latest      |  --to-latest                                  |  Reset to latest offset available: --reset-offsets --group test.group --topic foo --to-latest                                                |
:----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
| Reset to Offset      |  --to-offset                                  |  Reset to offset 1 in all partitions: --reset-offsets --group test.group --topic foo --to-offset 1                                           |
:----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
| Shift Offset by 'n'  |  --shift-by n                                 |  Reset to current offset plus 5 positions: --reset-offsets --group test.group –topic foo --shift-by 5                                        |
:----------------------+-----------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------:
| Reset from File      |  --from-file PATH_TO_FILE                     |  Reset using a file with reset plan: --reset-offsets --group test.group --from-file reset-plan.csv                                           |
'----------------------'-----------------------------------------------'----------------------------------------------------------------------------------------------------------------------------------------------'

You can also define the partition you want to reset, example:

  • Reset offset of topic foo partition 0 to 1

    --reset-offsets --group test.group --topic foo:0 --to-offset 1

  • Reset offset of topic foo partition 0,1,2 to earliest

    --reset-offsets --group test.group --topic foo:0,1,2 --to-earliest

Reminder: don't forget the --execute flag (see the execution options in the KIP). Without this flag the script will only print out the result of the scenario by scope, for example:

TOPIC                 PARTITION NEW-OFFSET NEW-LAG LOG-END-OFFSET CONSUMER-ID HOST CLIENT-ID
foo                   0         90         10      100            -           -    -

Credits to this answer. Table created with ascii tables

 

example2

can use the --export command of kafka-consumer-groups to create the CSV file from existing information, without changing anything via --dry-run. For example:

bin/kafka-consumer-groups \
  --bootstrap-server $KAFKA \
  --export --group $GROUP_NAME --topic $TOPIC \
  --reset-offsets --to-current \
  --dry-run

The value --to-current can be changed to various other values, such as --to-datetime--by-period, etc.

The output of that command is the required CSV file necessary for --from-file.

One very useful use case for this is to copy the offsets from one consumer group to another consumer group, for example:

bin/kafka-consumer-groups \
  --bootstrap-server $KAFKA \
  --export --group $FROM_GROUP_NAME --topic $TOPIC \
  --reset-offsets --to-current \
  --dry-run > offsets.txt

bin/kafka-consumer-groups \
  --bootstrap-server $KAFKA \
  --execute --group $TO_GROUP_NAME \
  --reset-offsets --from-file offsets.txt

 

参考

https://blog.csdn.net/qq_30868737/article/details/103597073

https://help.aiven.io/en/articles/2661525-viewing-and-resetting-consumer-group-offsets

https://*.com/questions/29791268/how-to-change-start-offset-for-topic

https://*.com/questions/55477753/how-to-reset-offsets-to-arbitrary-value-in-kafka-consumer-group

 

相关标签: Kafka