Skip to content

Instantly share code, notes, and snippets.

@amit1rrr
Created February 28, 2018 17:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save amit1rrr/205d04bd702ee7fde6d1780e9b0eb24d to your computer and use it in GitHub Desktop.
Save amit1rrr/205d04bd702ee7fde6d1780e9b0eb24d to your computer and use it in GitHub Desktop.
Word count demo of Apache Kafka.
// Serializers/deserializers (serde) for String and Long types
final Serde<String> stringSerde = Serdes.String();
final Serde<Long> longSerde = Serdes.Long();
// Construct a `KStream` from the input topic "streams-plaintext-input", where message values
// represent lines of text (for the sake of this example, we ignore whatever may be stored
// in the message keys).
KStream<String, String> textLines = builder.stream("streams-plaintext-input",
Consumed.with(stringSerde, stringSerde);
KTable<String, Long> wordCounts = textLines
// Split each text line, by whitespace, into words.
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
// Group the text words as message keys
.groupBy((key, value) -> value)
// Count the occurrences of each word (message key).
.count()
// Store the running counts as a changelog stream to the output topic.
wordCounts.toStream().to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment