Skip to content

Instantly share code, notes, and snippets.

@kweimann
Created February 21, 2016 17:47
Show Gist options
  • Save kweimann/41dbca40e5ab07cbfbb8 to your computer and use it in GitHub Desktop.
Save kweimann/41dbca40e5ab07cbfbb8 to your computer and use it in GitHub Desktop.
Akka Streams exercise: custom streams & graph DSL
class Delayed[A](delay: FiniteDuration) extends GraphStage[FlowShape[A, A]] {
val in = Inlet[A]("Delayed.in")
val out = Outlet[A]("Delayed.out")
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
var open = true
setHandler(in, new InHandler {
override def onPush(): Unit = {
push(out, grab(in))
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (open) {
open = false
scheduleOnce(None, delay)
}
}
})
override protected def onTimer(timerKey: Any): Unit = {
open = true
pull(in)
}
}
override def shape: FlowShape[A, A] = FlowShape.of(in, out)
}
def delayed[A] = new Delayed[A](_)
val graph = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder =>
import GraphDSL.Implicits._
case class Tick()
val source = Source(1 to 50).via(delayed[Int](100 millisecond))
val sink = Sink.foreach(println)
val merge = builder.add(MergePreferred[Int](1))
val broadcast = builder.add(Broadcast[Int](2))
val zip = builder.add(ZipWith[Int, Tick, Int]((i, tick) => -i))
val ticks = Source.tick(1 second, 1 second, Tick())
zip.out ~> merge.preferred
source ~> merge.in(0)
merge.out ~> broadcast.in
broadcast.out(0)
.filter(_ > 0)
.conflate(_ + _) ~> zip.in0
ticks ~> zip.in1
broadcast.out(1) ~> sink
ClosedShape
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment