Skip to content

Instantly share code, notes, and snippets.

@mitch-seymour
Created January 3, 2017 14:55
Show Gist options
  • Save mitch-seymour/3427ecd0a577cf26b67021e964d6be6c to your computer and use it in GitHub Desktop.
Save mitch-seymour/3427ecd0a577cf26b67021e964d6be6c to your computer and use it in GitHub Desktop.
class StreamsApp {
// properties, methods, etc
public void forceInitializationOfStateStore(Map<String, Object> config, String sourceTopic, Deserializer keyDeserializer, Deserializer valueDeserializer) {
config.put(ENABLE_AUTO_COMMIT_CONFIG, true);
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(config, keyDeserializer, valueDeserializer);
ConsumerRebalanceListener listener = new ConsumerRebalanceListener() {
public void onPartitionsRevoked (Collection< TopicPartition > partitions) {}
public void onPartitionsAssigned (Collection < TopicPartition > partitions) {
List<TopicPartition> unInitialized = new ArrayList<>();
for (TopicPartition tp : partitions) {
OffsetAndMetadata offsetAndMetaData = consumer.committed(tp);
if (offsetAndMetaData == null) {
unInitialized.add(tp);
}
}
if (unInitialized.size() > 0) {
log.info("Forcing initialization of {} state stores for topic: {}", unInitialized.size(), sourceTopic);
consumer.seekToBeginning(unInitialized);
} else {
log.info("{} state stores have already been initialized for topic: {}", partitions.size(), sourceTopic);
}
}
};
consumer.subscribe(Collections.singletonList(sourceTopic), listener);
consumer.poll(1000L);
consumer.close();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment