Created
August 5, 2016 13:30
-
-
Save adamw/3803e2361daae5bdc0ba097a60f2d554 to your computer and use it in GitHub Desktop.
Windowing data in Akka
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.softwaremill.akka | |
import java.time._ | |
import akka.actor.ActorSystem | |
import akka.stream.ActorMaterializer | |
import akka.stream.scaladsl.Source | |
import scala.collection.mutable | |
import scala.concurrent.Await | |
import scala.concurrent.duration._ | |
import scala.util.Random | |
object WindowingExample { | |
def main(args: Array[String]): Unit = { | |
implicit val as = ActorSystem() | |
implicit val mat = ActorMaterializer() | |
val random = new Random() | |
val f = Source | |
.tick(0.seconds, 1.second, "") | |
.map { _ => | |
val now = System.currentTimeMillis() | |
val delay = random.nextInt(8) | |
MyEvent(now - delay * 1000L) | |
} | |
.statefulMapConcat { () => | |
val generator = new CommandGenerator() | |
ev => generator.forEvent(ev) | |
} | |
.groupBy(64, command => command.w) | |
.takeWhile(!_.isInstanceOf[CloseWindow]) | |
.fold(AggregateEventData((0L, 0L), 0)) { | |
case (agg, OpenWindow(window)) => agg.copy(w = window) | |
// always filtered out by takeWhile | |
case (agg, CloseWindow(_)) => agg | |
case (agg, AddToWindow(ev, _)) => agg.copy(eventCount = agg.eventCount+1) | |
} | |
.async | |
.mergeSubstreams | |
.runForeach { agg => | |
println(agg.toString) | |
} | |
try Await.result(f, 60.minutes) | |
finally as.terminate() | |
} | |
case class MyEvent(timestamp: Long) | |
type Window = (Long, Long) | |
object Window { | |
val WindowLength = 10.seconds.toMillis | |
val WindowStep = 1.second .toMillis | |
val WindowsPerEvent = (WindowLength / WindowStep).toInt | |
def windowsFor(ts: Long): Set[Window] = { | |
val firstWindowStart = ts - ts % WindowStep - WindowLength + WindowStep | |
(for (i <- 0 until WindowsPerEvent) yield | |
(firstWindowStart + i * WindowStep, | |
firstWindowStart + i * WindowStep + WindowLength) | |
).toSet | |
} | |
} | |
sealed trait WindowCommand { | |
def w: Window | |
} | |
case class OpenWindow(w: Window) extends WindowCommand | |
case class CloseWindow(w: Window) extends WindowCommand | |
case class AddToWindow(ev: MyEvent, w: Window) extends WindowCommand | |
class CommandGenerator { | |
private val MaxDelay = 5.seconds.toMillis | |
private var watermark = 0L | |
private val openWindows = mutable.Set[Window]() | |
def forEvent(ev: MyEvent): List[WindowCommand] = { | |
watermark = math.max(watermark, ev.timestamp - MaxDelay) | |
if (ev.timestamp < watermark) { | |
println(s"Dropping event with timestamp: ${tsToString(ev.timestamp)}") | |
Nil | |
} else { | |
val eventWindows = Window.windowsFor(ev.timestamp) | |
val closeCommands = openWindows.flatMap { ow => | |
if (!eventWindows.contains(ow) && ow._2 < watermark) { | |
openWindows.remove(ow) | |
Some(CloseWindow(ow)) | |
} else None | |
} | |
val openCommands = eventWindows.flatMap { w => | |
if (!openWindows.contains(w)) { | |
openWindows.add(w) | |
Some(OpenWindow(w)) | |
} else None | |
} | |
val addCommands = eventWindows.map(w => AddToWindow(ev, w)) | |
openCommands.toList ++ closeCommands.toList ++ addCommands.toList | |
} | |
} | |
} | |
case class AggregateEventData(w: Window, eventCount: Int) { | |
override def toString = | |
s"Between ${tsToString(w._1)} and ${tsToString(w._2)}, there were $eventCount events." | |
} | |
def tsToString(ts: Long) = OffsetDateTime | |
.ofInstant(Instant.ofEpochMilli(ts), ZoneId.systemDefault()) | |
.toLocalTime | |
.toString | |
} | |
@alanngai yes, you are right, that additional condition is not needed. Thanks for spotting that!
This is great, read your blog, good read. Thanks for sharing.
Put in comments here, that prior to 2.5.18 akka stream release, the group-by will keep track of closed sub-stream, therefore using start and end time as window will create ephemeral sub-streams that will cause memory piled up over long running streams.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi Adam,
Thanks for your blog on akka streams windowing. Very clear and concisely written. I do have a question this line however. I've asked Konrad and Johan and it was clear to them either.
I don't understand why the !eventWindows.contains(ow) in line 90 is necessary. Why shouldn't the window be closed if (ow._2 < watermark) is true? Why the additional condition? If the end window is < the watermark, there's no chance that the current event would be added to that window, because line 83 ensures that the current event is to the right of the watermark. What is it I'm missing?
Thanks,
Alan