Skip to content

Instantly share code, notes, and snippets.

@sap1ens
Created July 17, 2023 15:54
Show Gist options
  • Save sap1ens/1be2b3a96fbca90233947cbb510bce2a to your computer and use it in GitHub Desktop.
Save sap1ens/1be2b3a96fbca90233947cbb510bce2a to your computer and use it in GitHub Desktop.
Flink DataStream API deduplication in Scala
val dedupColumn = "..." // column name to use as a key for deduplication
val ttl = Some(Time.minutes(60)) // state TTL
stream
.keyBy(row => row.getField(dedupColumn))
.flatMap(new RichFlatMapFunction[Row, Row] {
@transient
private var seen: ValueState[Boolean] = _
override def open(parameters: Configuration): Unit = {
val stateDescriptor =
new ValueStateDescriptor[Boolean](
"seen",
createTypeInformation[Boolean]
)
ttl foreach { ttlValue =>
val ttlConfig = StateTtlConfig
.newBuilder(ttlValue)
.setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite)
.setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
.build
stateDescriptor.enableTimeToLive(ttlConfig)
}
seen = getRuntimeContext.getState(stateDescriptor)
}
override def flatMap(row: Row, out: Collector[Row]): Unit = {
if (seen.value() == null || !seen.value()) {
seen.update(true)
out.collect(row)
}
}
})
.name("Dedup")
.uid("dedup")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment