Skip to content

Instantly share code, notes, and snippets.

@shikhar
Created March 11, 2016 20:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save shikhar/ae46a6079d1efb240a15 to your computer and use it in GitHub Desktop.
Save shikhar/ae46a6079d1efb240a15 to your computer and use it in GitHub Desktop.
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
import org.apache.flink.streaming.api.watermark.Watermark
import scala.concurrent.duration.FiniteDuration
/**
* Implements a TimestampExtractor that treats a static delta from the last timestamp as the current watermark.
*
* @param baseExtractor
* @param toleranceThreshold
* @tparam T
*/
class ThresholdTimestampAssigner[T](baseExtractor: T => Long, toleranceThreshold: FiniteDuration) extends AssignerWithPeriodicWatermarks[T] {
private[this] var currentWatermark = Long.MinValue
override def extractTimestamp(currentElement: T, prevElementTimestamp: Long): Long = {
val currentTimestamp = baseExtractor(currentElement)
val candidateWatermark = currentTimestamp - toleranceThreshold.toMillis
if (candidateWatermark > 0 && candidateWatermark > currentWatermark) {
currentWatermark = candidateWatermark
}
currentTimestamp
}
override def getCurrentWatermark = new Watermark(currentWatermark)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment