Created
August 5, 2016 13:30
-
-
Save adamw/3803e2361daae5bdc0ba097a60f2d554 to your computer and use it in GitHub Desktop.
Windowing data in Akka
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.softwaremill.akka | |
import java.time._ | |
import akka.actor.ActorSystem | |
import akka.stream.ActorMaterializer | |
import akka.stream.scaladsl.Source | |
import scala.collection.mutable | |
import scala.concurrent.Await | |
import scala.concurrent.duration._ | |
import scala.util.Random | |
object WindowingExample { | |
def main(args: Array[String]): Unit = { | |
implicit val as = ActorSystem() | |
implicit val mat = ActorMaterializer() | |
val random = new Random() | |
val f = Source | |
.tick(0.seconds, 1.second, "") | |
.map { _ => | |
val now = System.currentTimeMillis() | |
val delay = random.nextInt(8) | |
MyEvent(now - delay * 1000L) | |
} | |
.statefulMapConcat { () => | |
val generator = new CommandGenerator() | |
ev => generator.forEvent(ev) | |
} | |
.groupBy(64, command => command.w) | |
.takeWhile(!_.isInstanceOf[CloseWindow]) | |
.fold(AggregateEventData((0L, 0L), 0)) { | |
case (agg, OpenWindow(window)) => agg.copy(w = window) | |
// always filtered out by takeWhile | |
case (agg, CloseWindow(_)) => agg | |
case (agg, AddToWindow(ev, _)) => agg.copy(eventCount = agg.eventCount+1) | |
} | |
.async | |
.mergeSubstreams | |
.runForeach { agg => | |
println(agg.toString) | |
} | |
try Await.result(f, 60.minutes) | |
finally as.terminate() | |
} | |
case class MyEvent(timestamp: Long) | |
type Window = (Long, Long) | |
object Window { | |
val WindowLength = 10.seconds.toMillis | |
val WindowStep = 1.second .toMillis | |
val WindowsPerEvent = (WindowLength / WindowStep).toInt | |
def windowsFor(ts: Long): Set[Window] = { | |
val firstWindowStart = ts - ts % WindowStep - WindowLength + WindowStep | |
(for (i <- 0 until WindowsPerEvent) yield | |
(firstWindowStart + i * WindowStep, | |
firstWindowStart + i * WindowStep + WindowLength) | |
).toSet | |
} | |
} | |
sealed trait WindowCommand { | |
def w: Window | |
} | |
case class OpenWindow(w: Window) extends WindowCommand | |
case class CloseWindow(w: Window) extends WindowCommand | |
case class AddToWindow(ev: MyEvent, w: Window) extends WindowCommand | |
class CommandGenerator { | |
private val MaxDelay = 5.seconds.toMillis | |
private var watermark = 0L | |
private val openWindows = mutable.Set[Window]() | |
def forEvent(ev: MyEvent): List[WindowCommand] = { | |
watermark = math.max(watermark, ev.timestamp - MaxDelay) | |
if (ev.timestamp < watermark) { | |
println(s"Dropping event with timestamp: ${tsToString(ev.timestamp)}") | |
Nil | |
} else { | |
val eventWindows = Window.windowsFor(ev.timestamp) | |
val closeCommands = openWindows.flatMap { ow => | |
if (!eventWindows.contains(ow) && ow._2 < watermark) { | |
openWindows.remove(ow) | |
Some(CloseWindow(ow)) | |
} else None | |
} | |
val openCommands = eventWindows.flatMap { w => | |
if (!openWindows.contains(w)) { | |
openWindows.add(w) | |
Some(OpenWindow(w)) | |
} else None | |
} | |
val addCommands = eventWindows.map(w => AddToWindow(ev, w)) | |
openCommands.toList ++ closeCommands.toList ++ addCommands.toList | |
} | |
} | |
} | |
case class AggregateEventData(w: Window, eventCount: Int) { | |
override def toString = | |
s"Between ${tsToString(w._1)} and ${tsToString(w._2)}, there were $eventCount events." | |
} | |
def tsToString(ts: Long) = OffsetDateTime | |
.ofInstant(Instant.ofEpochMilli(ts), ZoneId.systemDefault()) | |
.toLocalTime | |
.toString | |
} | |
This is great, read your blog, good read. Thanks for sharing.
Put in comments here, that prior to 2.5.18 akka stream release, the group-by will keep track of closed sub-stream, therefore using start and end time as window will create ephemeral sub-streams that will cause memory piled up over long running streams.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
@alanngai yes, you are right, that additional condition is not needed. Thanks for spotting that!