Skip to content

Instantly share code, notes, and snippets.

@tuxfight3r
Last active November 17, 2023 15:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save tuxfight3r/70b27cd3d8c4d06ad5ad026d7c038564 to your computer and use it in GitHub Desktop.
Save tuxfight3r/70b27cd3d8c4d06ad5ad026d7c038564 to your computer and use it in GitHub Desktop.
consume message from kafka topic from any given offset

Consume kafka messages from a specific offset

There are 2 ways of consuming kafka messages from a given topic at a specific offset.

Consume the messages from any offset by creating consumer group

get_group_offset(){
  # describe a given group and sort by partition
/home/tools/confluent/bin/kafka-consumer-groups --bootstrap-server ${KAFKA_BROKER} --command-config ${CONFIG} --timeout 15000 --group $1 --describe | sort -k2,3
}


## create a test group associated to the input topic and set the offset to latest
/home/tools/confluent/bin/kafka-consumer-groups --bootstrap-server ${KAFKA_BROKER} \
--command-config ${CONFIG} \
--reset-offsets \
--to-latest \
--execute \
--group devops_test_group \
--topic test-input_topic

# reset offset for a single partition on that group to the offset you wanted to consume
/home/tools/confluent/bin/kafka-consumer-groups --bootstrap-server ${KAFKA_BROKER} \
--command-config ${CONFIG} \
--reset-offsets \
--to-offset 74625 \
--execute \
--group devops_test_group \
--topic test-input_topic:0


#display the group offsets
get_group_offset devops_test_group

# consume 500 messages from that group from partition 0
/home/tools/confluent/bin/kafka-avro-console-consumer --bootstrap-server ${KAFKA_BROKER} --consumer.config ${CONFIG} \
--property schema.registry.url=${KAFKA_SCHEMA_REGISTRY_URL} --group devops_test_group  --timeout-ms 15000 --max-messages 500  --topic test-input_topic

# reset all the offset in the group to latest
/home/tools/confluent/bin/kafka-consumer-groups --bootstrap-server ${KAFKA_BROKER} \
--command-config ${CONFIG} \
--reset-offsets \
--to-latest \
--execute \
--group devops_test_group \
--topic test-input_topic

## reset offset for a single partition on that group to the offset you wanted to consume
/home/tools/confluent/bin/kafka-consumer-groups --bootstrap-server ${KAFKA_BROKER} \
--command-config ${CONFIG} \
--reset-offsets \
--to-offset 42775 \
--execute \
--group devops_test_group \
--topic test-input_topic:1


#display the group offsets
get_group_offset devops_test_group


# consume 500 messages from that group for partition 1
/home/tools/confluent/bin/kafka-avro-console-consumer --bootstrap-server ${KAFKA_BROKER} --consumer.config ${CONFIG} \
--property schema.registry.url=${KAFKA_SCHEMA_REGISTRY_URL} --group devops_test_group  --timeout-ms 15000 --max-messages 500  --topic test-input_topic

#display the group offsets
get_group_offset devops_test_group

consume the messages directly from topic without having to create consumer group.

/home/tools/confluent/bin/kafka-avro-console-consumer --bootstrap-server ${KAFKA_BROKER} --consumer.config ${CONFIG} \
--property schema.registry.url=${KAFKA_SCHEMA_REGISTRY_URL} --timeout-ms 15000 --max-messages 500  --topic test-input_topic --partition 0  --offset 74625

/home/tools/confluent/bin/kafka-avro-console-consumer --bootstrap-server ${KAFKA_BROKER} --consumer.config ${CONFIG} \
--property schema.registry.url=${KAFKA_SCHEMA_REGISTRY_URL} --timeout-ms 15000 --max-messages 500  --topic test-input_topic --partition 1  --offset 8000
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment