Skip to content

Instantly share code, notes, and snippets.

@bjoernhaeuser
Created April 7, 2018 08:33
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bjoernhaeuser/be1ffb06e5e970a50a5ffb309ded4b6b to your computer and use it in GitHub Desktop.
Save bjoernhaeuser/be1ffb06e5e970a50a5ffb309ded4b6b to your computer and use it in GitHub Desktop.
private def configure(builder: StreamsBuilder): KStream[String, String] = {
val aggregateTable: KTable[Windowed[String], String] = builder
.stream[String, String]("input-topic")
.filter((key: String, _: String) => !key.isEmpty)
.groupByKey
.windowedBy(SessionWindows.`with`(TimeUnit.SECONDS.toMillis(windowSize)))
.aggregate(
() => MapperUtil.asString(ConversionAggregate(None, None)),
ConversionAggregator.apply,
ConversionMerger.apply
)
aggregateTable
.toStream[String]((key: Windowed[String], _: String) => key.key())
.filter((_, value) => value != null) // ignore tombstone records
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment