Skip to content

Instantly share code, notes, and snippets.

@songyunlu
Created March 12, 2017 02:52
Show Gist options
  • Save songyunlu/f72badf77b67d18c75cfaa483979e4ff to your computer and use it in GitHub Desktop.
Save songyunlu/f72badf77b67d18c75cfaa483979e4ff to your computer and use it in GitHub Desktop.
KStream<String, String> source = builder.stream("streams-file-input");
KTable<String, Long> counts = source
.flatMapValues(new ValueMapper<String, Iterable<String>>() {
@Override
public Iterable<String> apply(String value) {
return Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" "));
}
}).map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
@Override
public KeyValue<String, String> apply(String key, String value) {
return new KeyValue<>(value, value);
}
})
.groupByKey()
.count("Counts");
counts.to(Serdes.String(), Serdes.Long(), "streams-wordcount-output");
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment