Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
How to Reset Kafka Consumer Group Offset

Kafka 0.11.0.0 (Confluent 3.3.0) added support to manipulate offsets for a consumer group via cli kafka-consumer-groups command.

  1. List the topics to which the group is subscribed
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --describe

Note the values under "CURRENT-OFFSET" and "LOG-END-OFFSET". "CURRENT-OFFSET" is the offset where this consumer group is currently at in each of the partitions.

  1. Reset the consumer offset for a topic (preview)
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest

This will print the expected result of the reset, but not actually run it.

  1. Reset the consumer offset for a topic (execute)
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest --execute

This will execute the reset and reset the consumer group offset for the specified topic back to 0.

  1. Repeat 1 to check if the reset is successful

Note

  • The consumer group must have no running instance when performing the reset. Otherwise the reset will be rejected.
  • There are many other resetting options, run kafka-consumer-groups for details
    • --shift-by <positive_or_negative_integer>
    • --to-current
    • --to-latest
    • --to-offset <offset_integer>
    • --to-datetime <datetime_string>
    • --by-duration <duration_string>
  • The command also provides an option to reset offsets for all topics the consumer group subscribes to: --all-topics
@daniilyar
Copy link

daniilyar commented Feb 22, 2018

For ones who want to do the same for Kafka 0.10 you may download Kafka 1.0 binaries which include the script with this new offset reset capabilities, it will work for Kafka 0.10 as well:

wget http://apache-mirror.rbc.ru/pub/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz
tar -xvf kafka_2.11-1.0.0.tgz
cd kafka_2.11-1.0.0/bin

# Check what offsets is flume-archive currently on:
./kafka-consumer-groups.sh --bootstrap-server <kafka:port> --group <group> --describe

# Reset the offset
./kafka-consumer-groups.sh --bootstrap-server <kafka:port>--group <group>--topic <topic> --reset-offsets --to-earliest --execute

# Check what offsets is flume-archive currently on again to make sure that offset is now resetted properly:
./kafka-consumer-groups.sh --bootstrap-server <kafka:port> --group <group> --describe

# Current offset for all topic partitions should be now reduced. I.e. LOG-END-OFFSET should be bigger than CURRENT-OFFSET

@michalkarolik
Copy link

michalkarolik commented Mar 21, 2018

thx for instruction :) Do you know why it is so hard to achieve in most libraries? Did you do any research about it? I have checked that in kafka-go, sarama ( both golang) and spring-kafka - there is no easy way to reset offset while using consumer groups. I just have problem with understanding if its dangerous to reset offset or there are some other technical issues.

More description:
https://stackoverflow.com/questions/49360325/kafka-consumer-group-loses-not-commited-messages

@Joginder22
Copy link

Joginder22 commented Apr 17, 2018

thanks daniilyar. but we're facing below error:

Error: Executing consumer group command failed due to The broker only supports OffsetFetchRequest v1, but we need v2 or newer to request all topic partitions.

Seems like version incompatibility here to execute commands with 1.0 binaries.

@yasha-podeswa-hs
Copy link

yasha-podeswa-hs commented Apr 25, 2018

Anyone know if there's a way to do this on Kafka 0.9.0?

@dkurzaj
Copy link

dkurzaj commented Jul 31, 2018

Apparently my latests and my earliests offsets have the same values, and are equal to LOG-END-OFFSET. I can not manage to go back to the beginning of the messages.

@mduhan
Copy link

mduhan commented Aug 18, 2018

kafka .10 does not support --rest-offsets but this can be achieved using java apis.
Please have a look.

https://gist.github.com/mduhan/0e0a4b08694f50d8a646d2adf02542fc

@manigeeth-narendula
Copy link

manigeeth-narendula commented Apr 1, 2019

Can anyone tell me how to do this for kafka 1.0.0

@OneCricketeer
Copy link

OneCricketeer commented Apr 22, 2019

The API is all the same since 0.11. This includes 1.0.0, 2.0, and above until anyone mentions otherwise.

@yuranos
Copy link

yuranos commented Jul 15, 2019

Applying 1.0 to 0.10 didn't work for me either. Same as @Joginder22:
Error: Executing consumer group command failed due to The broker only supports OffsetFetchRequest v1, but we need v2 or newer to request all topic partitions.

@Strech
Copy link

Strech commented Nov 13, 2019

I would like to make a small adjustment about how to reset a topic partition

./kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name>[:<partition number>] --reset-offsets --to-earliest --execute

@qinzl1
Copy link

qinzl1 commented Nov 21, 2019

kafka1.1 doesn't seem to support reset by --to-datetime. it can effect --to-earliest but do not effect --to-datetime

@zwu01
Copy link

zwu01 commented Dec 10, 2019

Anyone knows if it's possible to reset the group offset using librdkafkacpp APIs? Thanks

@muscovitebob
Copy link

muscovitebob commented Feb 6, 2020

Greetings all,

Just leaving a note here to save everyone using a Kafka cluster protected by SSL time. In case you are getting the following OOM error when running kafka-consumer-groups:

[2020-02-06 09:31:40,554] ERROR Uncaught exception in thread 'kafka-admin-client-thread | adminclient-1': (org.apache.kafka.common.utils.KafkaThread)
java.lang.OutOfMemoryError: Java heap space
	at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
	at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
	at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
	at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
	at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:436)
	at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:397)
	at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:653)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:574)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)
	at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1152)
	at java.lang.Thread.run(Thread.java:748)

Counterintuitively, you are not running out of memory, but rather this arises as a result of not providing SSL credentials. You need to create a .properties file and pass it into each call to the binary via the --command-config option. In our case, the template for this properties file is:

ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
request.timeout.ms=20000
bootstrap.servers=BOOTSTRAP
retry.backoff.ms=500
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="KEY" password="PASSWORD";
security.protocol=SASL_SSL

Hope this saves someone time in the future.

@Schachte
Copy link

Schachte commented Mar 20, 2020

Thanks @muscovitebob!

@sharanm
Copy link

sharanm commented Mar 28, 2020

Thanks @muscovitebob, saved my day.

@scottcarey
Copy link

scottcarey commented May 6, 2020

I find it rediculous that there is no --partition option or similar, I MUST fix an offset for only one partition without modifying the others.

@Strech
Copy link

Strech commented May 6, 2020

@scottcarey it's possible, check my message

https://gist.github.com/marwei/cd40657c481f94ebe273ecc16601674b#gistcomment-3082013

--topic <topic_name>[:<partition number>]

@ghenadiibatalski
Copy link

ghenadiibatalski commented Oct 15, 2020

Is it possible to reset offsets on an active consumer groups?

@Strech
Copy link

Strech commented Oct 15, 2020

Is it possible to reset offsets on an active consumer groups?

@ghenadiibatalski Nope, you will be asked to stop consumers

@ankygupta9999
Copy link

ankygupta9999 commented Jan 18, 2021

All, anyone faces timeout, Timed out waiting to send the call

@ankygupta9999
Copy link

ankygupta9999 commented Feb 2, 2021

All, anyone faces timeout, Timed out waiting to send the call

Resolved: by using --timeout 10000 param option.

@aairbag
Copy link

aairbag commented Feb 3, 2021

Does anyone know the minimum permissions required to reset offsets? Every doc about kafka ACL operations that I've looked through is vague about permissions for resetting offsets.

@dcguim
Copy link

dcguim commented Mar 19, 2021

For Kafka 2.4.0 (Commit:77a89fcf8d7fa018) after kafka docker restart, reset offsets to earliest is refusing to reset to 0.

docker exec -it kafka /kafka/bin/kafka-consumer-groups.sh \
                                --bootstrap-server kafka:9092 \                 
                                --group <mygroup> \                     
                                --reset-offsets \
                                --to-earliest \
                                --all-topics \
                                --execute
                                
GROUP                          TOPIC                          PARTITION  NEW-OFFSET
<mygroup>                     topic1                                  0        1649

@meseta
Copy link

meseta commented May 6, 2021

Thanks for this! I come here about once every couple weeks because I can never remember these commands

@jeanadrien
Copy link

jeanadrien commented May 27, 2021

@dcguim @dkurzaj - I'm seeing the same behaviour. Offset are stuck to their initial value. Have you managed to solve this ?

@dcguim
Copy link

dcguim commented May 27, 2021

I have not managed to solve this problem, nor found the specific configuration for this. What I can do is delete and recreate the problematic topic.

@dkurzaj
Copy link

dkurzaj commented Jun 5, 2021

I actually do not remember, sorry...

@singh1024
Copy link

singh1024 commented Oct 30, 2021

Apparently my latests and my earliests offsets have the same values, and are equal to LOG-END-OFFSET. I can not manage to go back to the beginning of the messages.

@dkurzaj that is because your topic doesn't contain messages, and the offset can only be reset to the first message present in the topic at max

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment