Skip to content

Instantly share code, notes, and snippets.

@rocketraman
Last active June 23, 2021 20:25
Show Gist options
  • Save rocketraman/543f066813fc89590f23ff5dacf43f01 to your computer and use it in GitHub Desktop.
Save rocketraman/543f066813fc89590f23ff5dacf43f01 to your computer and use it in GitHub Desktop.
Beam calendar day windows with context elements
package com.xacoach.xascore.backend.analytics.windowfns
import org.apache.beam.sdk.coders.Coder
import org.apache.beam.sdk.transforms.*
import org.apache.beam.sdk.transforms.windowing.*
import org.apache.beam.sdk.values.KV
import org.apache.beam.sdk.values.PCollection
import org.apache.beam.sdk.values.TypeDescriptor
import org.apache.beam.sdk.values.TypeDescriptors
import org.joda.time.DateTimeZone
import java.time.Instant
import java.time.ZoneId
import java.time.ZoneOffset
import kotlin.time.Duration
import kotlin.time.days
import org.joda.time.Duration as JodaTimeDuration
import org.joda.time.Instant as JodaInstant
/**
* A calendar day window that provides context of a certain number of days prior. Since the only way to
* trigger based on completeness of data in Beam is the AfterWatermark.pastEndOfWindow() trigger, we window
* based on the context period, and then add the "last day"'s elements as late inputs into those context
* windows.
*
* In that way, the window will trigger only once all the context is available.
*/
class ContextualCalendarDayWindow<T> private constructor(
private val contextDays: Int,
private val timeZone: ZoneId = ZoneOffset.UTC
): NonMergingWindowFn<T, IntervalWindow>() {
companion object {
/**
* This window requires specific setup of the associated trigger function, so use an invoker to create the
* window function, and configure it.
*/
operator fun <T> invoke(days: Int, allowedLateness: Duration, timeZone: ZoneId = ZoneOffset.UTC): Window<T> {
return Window.into<T>(ContextualCalendarDayWindow(days - 1, timeZone))
.triggering(
// no early trigger here, we only want to trigger on "late" elements which indicate our real elements
AfterWatermark.pastEndOfWindow().withLateFirings(AfterPane.elementCountAtLeast(1))
)
.withAllowedLateness((allowedLateness).asJoda(), Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
.withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY)
// LATEST is generally inefficient, but we need it here to make the watermark move in such a way that the
// downstream windowing in DailyWindowsWithContext produces output in the correct panes -- its not completely
// clear why that is the case because downstream of this in `ContextWindowsWithTimestamps` we skew the
// timestamps forward anyway, however despite that if we don't use LATEST here, the panes are incorrect
.withTimestampCombiner(TimestampCombiner.LATEST)
.accumulatingFiredPanes()
}
}
init {
require(contextDays > 0) { "The number of days must be 0 (for the current day only) or positive." }
}
override fun assignWindows(c: AssignContext): Collection<IntervalWindow> =
assignWindows(c.timestamp().asJava())
override fun isCompatible(other: WindowFn<*, *>): Boolean = equals(other)
override fun windowCoder(): Coder<IntervalWindow> = IntervalWindow.getCoder()
override fun getDefaultWindowMappingFn(): WindowMappingFn<IntervalWindow> {
return object : WindowMappingFn<IntervalWindow>() {
override fun getSideInputWindow(mainWindow: BoundedWindow): IntervalWindow {
require(mainWindow !is GlobalWindow) { "Attempted to get side input window for GlobalWindow from non-global WindowFn" }
return assignWindows(mainWindow.maxTimestamp().asJava()).last()
}
}
}
override fun equals(other: Any?): Boolean {
val otherSlidingCalendarWindows = other as? ContextualCalendarDayWindow<*> ?: return false
return otherSlidingCalendarWindows.contextDays == contextDays && otherSlidingCalendarWindows.timeZone == timeZone
}
override fun hashCode(): Int {
var result = contextDays
result = 31 * result + timeZone.hashCode()
return result
}
private fun assignWindows(timestamp: Instant): Collection<IntervalWindow> {
return buildList {
val localDate = timestamp.atZone(timeZone).toLocalDate()
(0L..contextDays).forEach {
val intervalBegin = localDate.minusDays(contextDays - it)
// windows are sized by 1 less day than usual
val intervalEnd = intervalBegin.plusDays(contextDays.toLong())
add(IntervalWindow(
intervalBegin.atStartOfDay(timeZone).toInstant().asJoda(),
intervalEnd.atStartOfDay(timeZone).toInstant().asJoda()
))
}
}
}
}
/**
* Transforms inputs into daily windowed data, with a given number of days of contextual data attached. The input
* to this transform is a PCollection of elements of type V, which are then windowed via [ContextualCalendarDayWindow],
* and then keyed and grouped, and then windowed again into daily windows that contain that days elements, plus the
* context elements.
*
* The key and value type descriptors are needed to avoid errors due to type reification. The outputCoder may also
* be needed to avoid coder lookup errors for the output type, but is not required.
*/
class DailyWindowsWithContext<K, V>(
private val days: Int,
private val allowedLateness: Duration,
private val inZone: ZoneId = ZoneOffset.UTC,
private val keyFn: SerializableFunction<V, K>,
private val timestampFn: (V) -> Instant,
private val keyTypeDescriptor: TypeDescriptor<K>,
private val valueTypeDescriptor: TypeDescriptor<V>,
private val outputCoder: Coder<KV<K, Iterable<@JvmWildcard V>>>? = null,
) : PTransform<PCollection<V>, PCollection<KV<K, Iterable<@JvmWildcard V>>>>() {
override fun expand(input: PCollection<V>): PCollection<KV<K, Iterable<@JvmWildcard V>>> {
val outputTypeDescriptor = TypeDescriptors.kvs(keyTypeDescriptor, TypeDescriptors.iterables(valueTypeDescriptor))
return input
.apply("ContextWindows", ContextualCalendarDayWindow(days, allowedLateness, inZone))
.apply("ContextWindowsKeys", WithKeys.of(keyFn).withKeyType(keyTypeDescriptor))
.apply("ContextWindowsGroupBy", GroupByKey.create())
.apply("ContextWindowsFilterFn", ParDo.of(ContextWindowsFilterFn(timestampFn)))
// move the timestamp forward so elements end up in the "right" daily windows, with context elements attached
.apply("ContextWindowsWithTimestamps", ParDo.of(ContextWindowsWithTimestampFn(inZone, timestampFn)))
.apply("DailyWindowsWithContext",
Window.into<KV<K, Iterable<@JvmWildcard V>>>(CalendarWindows.days(1).withTimeZone(inZone.asJoda()))
.triggering(
AfterWatermark.pastEndOfWindow()
.withEarlyFirings(AfterProcessingTime.pastFirstElementInPane())
.withLateFirings(AfterPane.elementCountAtLeast(1))
)
.withAllowedLateness(allowedLateness.asJoda(), Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
.withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY)
.discardingFiredPanes()
)
.apply("DailyWindowsWithContextGroupBy", GroupByKey.create())
// due to window + GBK twice, now we have an Iterable of Iterables, unwrap it
.apply("DailyWindowsWithContextFlatten",
MapElements.into(outputTypeDescriptor).via(ProcessFunction {
KV.of(it.key, it.value.flatten())
})
)
.apply {
if (outputCoder != null) {
coder = outputCoder
}
}
}
}
/**
* Meant to be used after a [ContextualCalendarDayWindow] to filter out any outputs that contain only context,
* without any non-context elements. These are not useful for downstream processing. However, we pass through
* LATE additions as-is, as it is not possible to know whether these are useful downstream or not.
*/
private class ContextWindowsFilterFn<K, V>(val timestampOf: (V) -> Instant): DoFn<KV<K, Iterable<@JvmWildcard V>>, KV<K, Iterable<@JvmWildcard V>>>() {
@ProcessElement
fun process(
window: IntervalWindow,
paneInfo: PaneInfo,
@Element element: KV<K, Iterable<@JvmWildcard V>>,
receiver: OutputReceiver<KV<K, Iterable<@JvmWildcard V>>>
) {
if(paneInfo.timing == PaneInfo.Timing.LATE || element.value.any { timestampOf(it) >= window.end().asJava() }) {
receiver.output(element)
}
}
}
/**
* Meant to be used after a [ContextualCalendarDayWindow] to move the timestamp of elements forward into their
* "correct" window.
*/
private class ContextWindowsWithTimestampFn<K, V>(
val zoneId: ZoneId,
val timestampOf: (V) -> Instant
): DoFn<KV<K, Iterable<@JvmWildcard V>>, KV<K, Iterable<@JvmWildcard V>>>() {
@ProcessElement
fun process(
@Element element: KV<K, Iterable<@JvmWildcard V>>,
window: BoundedWindow,
receiver: OutputReceiver<KV<K, Iterable<@JvmWildcard V>>>
) {
val lastWindowDay = window.asClosedRange().lastDay(zoneId)
val maxTimestamp = element.value.maxOf { timestampOf(it) }
if (maxTimestamp in lastWindowDay.plusDays(1).asRange(zoneId).toInstantRange()) {
receiver.outputWithTimestamp(element, maxTimestamp.asJoda() )
}
}
}
fun Duration.asJoda(): JodaTimeDuration = JodaTimeDuration.millis(toLongMilliseconds())
fun Instant.asJoda(): JodaInstant = JodaInstant.ofEpochMilli(toEpochMilli())
fun JodaInstant.asJava(): Instant = Instant.ofEpochMilli(millis)
fun ZoneId.asJoda(): DateTimeZone = DateTimeZone.forID(if(id == "Z") "UTC" else id)
fun BoundedWindow.asClosedRange(): ClosedRange<Instant> {
return when(this) {
is IntervalWindow -> start().asJava().rangeTo(end().asJava())
else -> error("Window type ${javaClass::getCanonicalName} not supported")
}
}
fun ClosedRange<Instant>.lastDay(inZone: ZoneId): LocalDate =
endInclusive.minusSeconds(1).atZone(inZone).toLocalDate()
fun ClosedRange<ZonedDateTime>.toInstantRange(): ClosedRange<Instant> =
start.toInstant()..endInclusive.toInstant()
fun LocalDate.atStartOfNextDay(zoneId: ZoneId): ZonedDateTime = atStartOfDay(zoneId).plusDays(1)
fun LocalDate.asRange(zoneId: ZoneId): ClosedRange<ZonedDateTime> =
atStartOfDay(zoneId)..atStartOfNextDay(zoneId)
inline fun <reified T: Any> typeDescriptor(): TypeDescriptor<T> = TypeDescriptor.of(T::class.java)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment