Skip to content

Instantly share code, notes, and snippets.

@gAmUssA
Last active October 13, 2019 00:56
Show Gist options
  • Save gAmUssA/c253142e89726b5f47fa351d8a42e8b3 to your computer and use it in GitHub Desktop.
Save gAmUssA/c253142e89726b5f47fa351d8a42e8b3 to your computer and use it in GitHub Desktop.
CurrencyProcessing.kt
import org.apache.kafka.streams.kstream.KGroupedStream
import org.apache.kafka.streams.kstream.KStream
import org.springframework.stereotype.Component
import org.springframework.cloud.stream.annotation.StreamListener
@Component
class CurrencyProcessing {
@StreamListener
fun processCurrency(input: KStream<String, Double>) {
val groupByKey: KGroupedStream<String, Double> = input.groupByKey()
val countKTable = groupByKey.count()
val sumKTable = groupByKey.reduce { value1, value2 -> value1 + value2 }
val avgRate = sumKTable.join(countKTable) { sum, count -> sum / count }
avgRate.toStream().to("avg-rates")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment