Skip to content

Instantly share code, notes, and snippets.

@nehanarkhede
Last active August 25, 2020 09:14
Show Gist options
  • Save nehanarkhede/c8a3f8b0dbf35f18d6b97dce3dc01fd7 to your computer and use it in GitHub Desktop.
Save nehanarkhede/c8a3f8b0dbf35f18d6b97dce3dc01fd7 to your computer and use it in GitHub Desktop.
Word count program using Kafka Streams
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> textLines = builder.stream(stringSerde, stringSerde,"TextLinesTopic");
Pattern pattern = Pattern.compile("\\W+", Pattern.UNICODE_CHARACTER_CLASS);
KStream<String, Long> wordCounts = textLines
.flatMapValues(value-> Arrays.asList(pattern.split(value.toLowerCase())))
.map((key, word) -> new KeyValue<>(word, word))
.countByKey("Counts")
.toStream();
wordCounts.to(stringSerde, longSerde, "WordsWithCountsTopic");
KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
streams.start();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment