CurrencyProcessing.kt
import org.apache.kafka.streams.kstream.KGroupedStream | |
import org.apache.kafka.streams.kstream.KStream | |
import org.springframework.stereotype.Component | |
import org.springframework.cloud.stream.annotation.StreamListener | |
@Component | |
class CurrencyProcessing { | |
@StreamListener | |
fun processCurrency(input: KStream<String, Double>) { | |
val groupByKey: KGroupedStream<String, Double> = input.groupByKey() | |
val countKTable = groupByKey.count() | |
val sumKTable = groupByKey.reduce { value1, value2 -> value1 + value2 } | |
val avgRate = sumKTable.join(countKTable) { sum, count -> sum / count } | |
avgRate.toStream().to("avg-rates") | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment