Skip to content

Instantly share code, notes, and snippets.

@bufferings
Last active November 27, 2016 06:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bufferings/fa90877694d8e1033b4c2ae260e88781 to your computer and use it in GitHub Desktop.
Save bufferings/fa90877694d8e1033b4c2ae260e88781 to your computer and use it in GitHub Desktop.
KStreamBuilder builder = new KStreamBuilder();
KStream<String, String> source = builder.stream("streams-file-input");
KTable<String, Long> counts = source
.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
.map((key, value) -> new KeyValue<>(value, value))
.countByKey("Counts");
counts.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
KafkaStreams streams = new KafkaStreams(builder, props());
streams.start();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment