Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@kiview
Created December 13, 2017 16:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kiview/f1554d7089d3a253c83fd7c03c2e60b3 to your computer and use it in GitHub Desktop.
Save kiview/f1554d7089d3a253c83fd7c03c2e60b3 to your computer and use it in GitHub Desktop.
@Bean
KTable reportStream(StreamsBuilder builder, Engine engine) {
def stream = builder.stream(topic)
.groupBy({ key, word -> word })
.windowedBy(SessionWindows.with(TimeUnit.SECONDS.toMillis(1)))
.aggregate(
new Initializer<Long>() {
@Override
Long apply() {
0
}
},
new Aggregator<String, String, Long>() {
@Override
Long apply(String key, String value, Long aggregate) {
def l = 1 + aggregate
return l
}
},
new Merger() {
@Override
Long apply(Object aggKey, Object aggOne, Object aggTwo) {
return aggOne + aggTwo
}
},
Materialized.with(Serdes.String(), Serdes.Long()))
stream.toStream().to("classificationResult")
stream
}
@Bean
KStream classificationStream(StreamsBuilder builder, Engine engine) {
builder.stream("classificationResult").mapValues({
println "classResult"
println it
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment