Skip to content

Instantly share code, notes, and snippets.

@marwei
Created November 9, 2017 23:39
Show Gist options
  • Save marwei/cd40657c481f94ebe273ecc16601674b to your computer and use it in GitHub Desktop.
Save marwei/cd40657c481f94ebe273ecc16601674b to your computer and use it in GitHub Desktop.
How to Reset Kafka Consumer Group Offset

Kafka 0.11.0.0 (Confluent 3.3.0) added support to manipulate offsets for a consumer group via cli kafka-consumer-groups command.

  1. List the topics to which the group is subscribed
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --describe

Note the values under "CURRENT-OFFSET" and "LOG-END-OFFSET". "CURRENT-OFFSET" is the offset where this consumer group is currently at in each of the partitions.

  1. Reset the consumer offset for a topic (preview)
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest

This will print the expected result of the reset, but not actually run it.

  1. Reset the consumer offset for a topic (execute)
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest --execute

This will execute the reset and reset the consumer group offset for the specified topic back to 0.

  1. Repeat 1 to check if the reset is successful

Note

  • The consumer group must have no running instance when performing the reset. Otherwise the reset will be rejected.
  • There are many other resetting options, run kafka-consumer-groups for details
    • --shift-by <positive_or_negative_integer>
    • --to-current
    • --to-latest
    • --to-offset <offset_integer>
    • --to-datetime <datetime_string>
    • --by-duration <duration_string>
  • The command also provides an option to reset offsets for all topics the consumer group subscribes to: --all-topics
@yuranos
Copy link

yuranos commented Jul 15, 2019

Applying 1.0 to 0.10 didn't work for me either. Same as @Joginder22:
Error: Executing consumer group command failed due to The broker only supports OffsetFetchRequest v1, but we need v2 or newer to request all topic partitions.

@Strech
Copy link

Strech commented Nov 13, 2019

I would like to make a small adjustment about how to reset a topic partition

./kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name>[:<partition number>] --reset-offsets --to-earliest --execute

@qinzl1
Copy link

qinzl1 commented Nov 21, 2019

kafka1.1 doesn't seem to support reset by --to-datetime. it can effect --to-earliest but do not effect --to-datetime

@zwu01
Copy link

zwu01 commented Dec 10, 2019

Anyone knows if it's possible to reset the group offset using librdkafkacpp APIs? Thanks

@muscovitebob
Copy link

Greetings all,

Just leaving a note here to save everyone using a Kafka cluster protected by SSL time. In case you are getting the following OOM error when running kafka-consumer-groups:

[2020-02-06 09:31:40,554] ERROR Uncaught exception in thread 'kafka-admin-client-thread | adminclient-1': (org.apache.kafka.common.utils.KafkaThread)
java.lang.OutOfMemoryError: Java heap space
	at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
	at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
	at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
	at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
	at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:436)
	at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:397)
	at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:653)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:574)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)
	at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1152)
	at java.lang.Thread.run(Thread.java:748)

Counterintuitively, you are not running out of memory, but rather this arises as a result of not providing SSL credentials. You need to create a .properties file and pass it into each call to the binary via the --command-config option. In our case, the template for this properties file is:

ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
request.timeout.ms=20000
bootstrap.servers=BOOTSTRAP
retry.backoff.ms=500
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="KEY" password="PASSWORD";
security.protocol=SASL_SSL

Hope this saves someone time in the future.

@Schachte
Copy link

Thanks @muscovitebob!

@sharanm
Copy link

sharanm commented Mar 28, 2020

Thanks @muscovitebob, saved my day.

@scottcarey
Copy link

I find it rediculous that there is no --partition option or similar, I MUST fix an offset for only one partition without modifying the others.

@Strech
Copy link

Strech commented May 6, 2020

@scottcarey it's possible, check my message

https://gist.github.com/marwei/cd40657c481f94ebe273ecc16601674b#gistcomment-3082013

--topic <topic_name>[:<partition number>]

@ghenadiibatalski
Copy link

Is it possible to reset offsets on an active consumer groups?

@Strech
Copy link

Strech commented Oct 15, 2020

Is it possible to reset offsets on an active consumer groups?

@ghenadiibatalski Nope, you will be asked to stop consumers

@ankygupta9999
Copy link

All, anyone faces timeout, Timed out waiting to send the call

@ankygupta9999
Copy link

All, anyone faces timeout, Timed out waiting to send the call

Resolved: by using --timeout 10000 param option.

@aairbag
Copy link

aairbag commented Feb 3, 2021

Does anyone know the minimum permissions required to reset offsets? Every doc about kafka ACL operations that I've looked through is vague about permissions for resetting offsets.

@dcguim
Copy link

dcguim commented Mar 19, 2021

For Kafka 2.4.0 (Commit:77a89fcf8d7fa018) after kafka docker restart, reset offsets to earliest is refusing to reset to 0.

docker exec -it kafka /kafka/bin/kafka-consumer-groups.sh \
                                --bootstrap-server kafka:9092 \                 
                                --group <mygroup> \                     
                                --reset-offsets \
                                --to-earliest \
                                --all-topics \
                                --execute
                                
GROUP                          TOPIC                          PARTITION  NEW-OFFSET
<mygroup>                     topic1                                  0        1649

@meseta
Copy link

meseta commented May 6, 2021

Thanks for this! I come here about once every couple weeks because I can never remember these commands

@jeanadrien
Copy link

@dcguim @dkurzaj - I'm seeing the same behaviour. Offset are stuck to their initial value. Have you managed to solve this ?

@dcguim
Copy link

dcguim commented May 27, 2021

I have not managed to solve this problem, nor found the specific configuration for this. What I can do is delete and recreate the problematic topic.

@dkurzaj
Copy link

dkurzaj commented Jun 5, 2021

I actually do not remember, sorry...

@zonedoutfella
Copy link

Apparently my latests and my earliests offsets have the same values, and are equal to LOG-END-OFFSET. I can not manage to go back to the beginning of the messages.

@dkurzaj that is because your topic doesn't contain messages, and the offset can only be reset to the first message present in the topic at max

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment