Skip to content

Instantly share code, notes, and snippets.

@brendonanderson
Created October 9, 2017 01:29
Show Gist options
  • Save brendonanderson/d90dd5e24ebf72676e4c08e7620f4f1d to your computer and use it in GitHub Desktop.
Save brendonanderson/d90dd5e24ebf72676e4c08e7620f4f1d to your computer and use it in GitHub Desktop.
Consume Kafka Messages in functional test
String consume(String topic, String key, int maxRetries, long pollMs) {
int retry = 0
String message = null
KafkaConsumer<String, String> consumer = createKafkaConsumer(topic)
while (!message && retry < maxRetries) {
retry++
ConsumerRecords consumerRecords = consumer.poll(pollMs)
consumerRecords.each { ConsumerRecord record ->
if (record.key() == key) {
message = record.value()
}
}
consumer.commitSync()
}
consumer.close()
return message
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment