Skip to content

Instantly share code, notes, and snippets.

@jmbataller
Created September 19, 2019 14:14
Show Gist options
  • Save jmbataller/f00ffbe718456a90a94ca447d0052c94 to your computer and use it in GitHub Desktop.
Save jmbataller/f00ffbe718456a90a94ca447d0052c94 to your computer and use it in GitHub Desktop.
drain producer records in kafka streams topology test driver
private <K, V> List<ProducerRecord<K, V>> drainProducerRecords(final TopologyTestDriver driver,
final String topic,
final Deserializer<K> keyDeserializer,
final Deserializer<V> valueDeserializer) {
final List<ProducerRecord<K, V>> result = new LinkedList<>();
for (ProducerRecord<K, V> next = driver.readOutput(topic, keyDeserializer, valueDeserializer);
next != null;
next = driver.readOutput(topic, keyDeserializer, valueDeserializer)) {
result.add(next);
}
return new ArrayList<>(result);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment