Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
How to Reset Kafka Consumer Group Offset

Kafka 0.11.0.0 (Confluent 3.3.0) added support to manipulate offsets for a consumer group via cli kafka-consumer-groups command.

  1. List the topics to which the group is subscribed
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --describe

Note the values under "CURRENT-OFFSET" and "LOG-END-OFFSET". "CURRENT-OFFSET" is the offset where this consumer group is currently at in each of the partitions.

  1. Reset the consumer offset for a topic (preview)
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest

This will print the expected result of the reset, but not actually run it.

  1. Reset the consumer offset for a topic (execute)
kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name> --reset-offsets --to-earliest --execute

This will execute the reset and reset the consumer group offset for the specified topic back to 0.

  1. Repeat 1 to check if the reset is successful

Note

  • The consumer group must have no running instance when performing the reset. Otherwise the reset will be rejected.
  • There are many other resetting options, run kafka-consumer-groups for details
    • --shift-by <positive_or_negative_integer>
    • --to-current
    • --to-latest
    • --to-offset <offset_integer>
    • --to-datetime <datetime_string>
    • --by-duration <duration_string>
  • The command also provides an option to reset offsets for all topics the consumer group subscribes to: --all-topics
@daniilyar

This comment has been minimized.

Copy link

daniilyar commented Feb 22, 2018

For ones who want to do the same for Kafka 0.10 you may download Kafka 1.0 binaries which include the script with this new offset reset capabilities, it will work for Kafka 0.10 as well:

wget http://apache-mirror.rbc.ru/pub/apache/kafka/1.0.0/kafka_2.11-1.0.0.tgz
tar -xvf kafka_2.11-1.0.0.tgz
cd kafka_2.11-1.0.0/bin

# Check what offsets is flume-archive currently on:
./kafka-consumer-groups.sh --bootstrap-server <kafka:port> --group <group> --describe

# Reset the offset
./kafka-consumer-groups.sh --bootstrap-server <kafka:port>--group <group>--topic <topic> --reset-offsets --to-earliest --execute

# Check what offsets is flume-archive currently on again to make sure that offset is now resetted properly:
./kafka-consumer-groups.sh --bootstrap-server <kafka:port> --group <group> --describe

# Current offset for all topic partitions should be now reduced. I.e. LOG-END-OFFSET should be bigger than CURRENT-OFFSET
@michalkarolik

This comment has been minimized.

Copy link

michalkarolik commented Mar 21, 2018

thx for instruction :) Do you know why it is so hard to achieve in most libraries? Did you do any research about it? I have checked that in kafka-go, sarama ( both golang) and spring-kafka - there is no easy way to reset offset while using consumer groups. I just have problem with understanding if its dangerous to reset offset or there are some other technical issues.

More description:
https://stackoverflow.com/questions/49360325/kafka-consumer-group-loses-not-commited-messages

@Joginder22

This comment has been minimized.

Copy link

Joginder22 commented Apr 17, 2018

thanks daniilyar. but we're facing below error:

Error: Executing consumer group command failed due to The broker only supports OffsetFetchRequest v1, but we need v2 or newer to request all topic partitions.

Seems like version incompatibility here to execute commands with 1.0 binaries.

@yasha-podeswa-hs

This comment has been minimized.

Copy link

yasha-podeswa-hs commented Apr 25, 2018

Anyone know if there's a way to do this on Kafka 0.9.0?

@dkurzaj

This comment has been minimized.

Copy link

dkurzaj commented Jul 31, 2018

Apparently my latests and my earliests offsets have the same values, and are equal to LOG-END-OFFSET. I can not manage to go back to the beginning of the messages.

@mduhan

This comment has been minimized.

Copy link

mduhan commented Aug 18, 2018

kafka .10 does not support --rest-offsets but this can be achieved using java apis.
Please have a look.

https://gist.github.com/mduhan/0e0a4b08694f50d8a646d2adf02542fc

@manigeeth-narendula

This comment has been minimized.

Copy link

manigeeth-narendula commented Apr 1, 2019

Can anyone tell me how to do this for kafka 1.0.0

@cricket007

This comment has been minimized.

Copy link

cricket007 commented Apr 22, 2019

The API is all the same since 0.11. This includes 1.0.0, 2.0, and above until anyone mentions otherwise.

@yuranos

This comment has been minimized.

Copy link

yuranos commented Jul 15, 2019

Applying 1.0 to 0.10 didn't work for me either. Same as @Joginder22:
Error: Executing consumer group command failed due to The broker only supports OffsetFetchRequest v1, but we need v2 or newer to request all topic partitions.

@Strech

This comment has been minimized.

Copy link

Strech commented Nov 13, 2019

I would like to make a small adjustment about how to reset a topic partition

./kafka-consumer-groups --bootstrap-server <kafkahost:port> --group <group_id> --topic <topic_name>[:<partition number>] --reset-offsets --to-earliest --execute
@qinzl1

This comment has been minimized.

Copy link

qinzl1 commented Nov 21, 2019

kafka1.1 doesn't seem to support reset by --to-datetime. it can effect --to-earliest but do not effect --to-datetime

@zwu01

This comment has been minimized.

Copy link

zwu01 commented Dec 10, 2019

Anyone knows if it's possible to reset the group offset using librdkafkacpp APIs? Thanks

@muscovitebob

This comment has been minimized.

Copy link

muscovitebob commented Feb 6, 2020

Greetings all,

Just leaving a note here to save everyone using a Kafka cluster protected by SSL time. In case you are getting the following OOM error when running kafka-consumer-groups:

[2020-02-06 09:31:40,554] ERROR Uncaught exception in thread 'kafka-admin-client-thread | adminclient-1': (org.apache.kafka.common.utils.KafkaThread)
java.lang.OutOfMemoryError: Java heap space
	at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
	at java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
	at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30)
	at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112)
	at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:436)
	at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:397)
	at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:653)
	at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:574)
	at org.apache.kafka.common.network.Selector.poll(Selector.java:485)
	at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:539)
	at org.apache.kafka.clients.admin.KafkaAdminClient$AdminClientRunnable.run(KafkaAdminClient.java:1152)
	at java.lang.Thread.run(Thread.java:748)

Counterintuitively, you are not running out of memory, but rather this arises as a result of not providing SSL credentials. You need to create a .properties file and pass it into each call to the binary via the --command-config option. In our case, the template for this properties file is:

ssl.endpoint.identification.algorithm=https
sasl.mechanism=PLAIN
request.timeout.ms=20000
bootstrap.servers=BOOTSTRAP
retry.backoff.ms=500
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="KEY" password="PASSWORD";
security.protocol=SASL_SSL

Hope this saves someone time in the future.

@Schachte

This comment has been minimized.

Copy link

Schachte commented Mar 20, 2020

Thanks @muscovitebob!

@sharanm

This comment has been minimized.

Copy link

sharanm commented Mar 28, 2020

Thanks @muscovitebob, saved my day.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.