Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Windowing data in Akka
package com.softwaremill.akka
import java.time._
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import scala.collection.mutable
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.Random
object WindowingExample {
def main(args: Array[String]): Unit = {
implicit val as = ActorSystem()
implicit val mat = ActorMaterializer()
val random = new Random()
val f = Source
.tick(0.seconds, 1.second, "")
.map { _ =>
val now = System.currentTimeMillis()
val delay = random.nextInt(8)
MyEvent(now - delay * 1000L)
}
.statefulMapConcat { () =>
val generator = new CommandGenerator()
ev => generator.forEvent(ev)
}
.groupBy(64, command => command.w)
.takeWhile(!_.isInstanceOf[CloseWindow])
.fold(AggregateEventData((0L, 0L), 0)) {
case (agg, OpenWindow(window)) => agg.copy(w = window)
// always filtered out by takeWhile
case (agg, CloseWindow(_)) => agg
case (agg, AddToWindow(ev, _)) => agg.copy(eventCount = agg.eventCount+1)
}
.async
.mergeSubstreams
.runForeach { agg =>
println(agg.toString)
}
try Await.result(f, 60.minutes)
finally as.terminate()
}
case class MyEvent(timestamp: Long)
type Window = (Long, Long)
object Window {
val WindowLength = 10.seconds.toMillis
val WindowStep = 1.second .toMillis
val WindowsPerEvent = (WindowLength / WindowStep).toInt
def windowsFor(ts: Long): Set[Window] = {
val firstWindowStart = ts - ts % WindowStep - WindowLength + WindowStep
(for (i <- 0 until WindowsPerEvent) yield
(firstWindowStart + i * WindowStep,
firstWindowStart + i * WindowStep + WindowLength)
).toSet
}
}
sealed trait WindowCommand {
def w: Window
}
case class OpenWindow(w: Window) extends WindowCommand
case class CloseWindow(w: Window) extends WindowCommand
case class AddToWindow(ev: MyEvent, w: Window) extends WindowCommand
class CommandGenerator {
private val MaxDelay = 5.seconds.toMillis
private var watermark = 0L
private val openWindows = mutable.Set[Window]()
def forEvent(ev: MyEvent): List[WindowCommand] = {
watermark = math.max(watermark, ev.timestamp - MaxDelay)
if (ev.timestamp < watermark) {
println(s"Dropping event with timestamp: ${tsToString(ev.timestamp)}")
Nil
} else {
val eventWindows = Window.windowsFor(ev.timestamp)
val closeCommands = openWindows.flatMap { ow =>
if (!eventWindows.contains(ow) && ow._2 < watermark) {
openWindows.remove(ow)
Some(CloseWindow(ow))
} else None
}
val openCommands = eventWindows.flatMap { w =>
if (!openWindows.contains(w)) {
openWindows.add(w)
Some(OpenWindow(w))
} else None
}
val addCommands = eventWindows.map(w => AddToWindow(ev, w))
openCommands.toList ++ closeCommands.toList ++ addCommands.toList
}
}
}
case class AggregateEventData(w: Window, eventCount: Int) {
override def toString =
s"Between ${tsToString(w._1)} and ${tsToString(w._2)}, there were $eventCount events."
}
def tsToString(ts: Long) = OffsetDateTime
.ofInstant(Instant.ofEpochMilli(ts), ZoneId.systemDefault())
.toLocalTime
.toString
}
@alanngai

This comment has been minimized.

Copy link

alanngai commented Jan 17, 2018

Hi Adam,

Thanks for your blog on akka streams windowing. Very clear and concisely written. I do have a question this line however. I've asked Konrad and Johan and it was clear to them either.

I don't understand why the !eventWindows.contains(ow) in line 90 is necessary. Why shouldn't the window be closed if (ow._2 < watermark) is true? Why the additional condition? If the end window is < the watermark, there's no chance that the current event would be added to that window, because line 83 ensures that the current event is to the right of the watermark. What is it I'm missing?

Thanks,
Alan

@adamw

This comment has been minimized.

Copy link
Owner Author

adamw commented Jan 19, 2018

@alanngai yes, you are right, that additional condition is not needed. Thanks for spotting that!

@xhoong

This comment has been minimized.

Copy link

xhoong commented Mar 18, 2019

This is great, read your blog, good read. Thanks for sharing.

Put in comments here, that prior to 2.5.18 akka stream release, the group-by will keep track of closed sub-stream, therefore using start and end time as window will create ephemeral sub-streams that will cause memory piled up over long running streams.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.