Skip to content

Instantly share code, notes, and snippets.

@yoeluk
Created October 25, 2017 14:49
Show Gist options
  • Save yoeluk/a467e389afb6e1c71df655fcb9c7dbec to your computer and use it in GitHub Desktop.
Save yoeluk/a467e389afb6e1c71df655fcb9c7dbec to your computer and use it in GitHub Desktop.
polling graph stage for akka stream
package akka.stream.alpakka
import akka.NotUsed
import akka.stream.{Attributes, Outlet, SourceShape}
import akka.stream.scaladsl.Source
import akka.stream.stage.{GraphStage, OutHandler, TimerGraphStageLogic}
import scala.concurrent.duration.FiniteDuration
import scala.util.Try
class TimerGraphStage[T](f: () => Source[T, NotUsed], pollInterval: FiniteDuration)
extends GraphStage[SourceShape[Source[T, NotUsed]]] {
val shape: SourceShape[Source[T, NotUsed]] = SourceShape(
Outlet[Source[T, NotUsed]](s"timer.out")
)
val out = shape.outlets.head.asInstanceOf[Outlet[Source[T, NotUsed]]]
override def createLogic(inheritedAttributes: Attributes) = {
val logic = new TimerGraphStageLogic(shape) {
private[this] var buffer: Seq[Source[T, NotUsed]] =
Seq.empty[Source[T, NotUsed]]
setHandler(
out,
new OutHandler {
def onPull(): Unit = {
buffer match {
case head +: tail =>
buffer = tail
push(out, head)
case _ =>
schedulePoll()
}
}
override def onDownstreamFinish(): Unit =
super.onDownstreamFinish()
}
)
private def pushHead(): Unit = {
val maybeHead = buffer.headOption
buffer = Seq.empty
maybeHead.foreach(push(out, _))
}
private[this] def run() =
buffer ++= Try(f()).toOption
override def onTimer(timerKey: Any): Unit =
if (!isClosed(out)) {
buffer = Seq.empty
run()
if (buffer.nonEmpty) pushHead()
else schedulePoll()
}
private def schedulePoll(): Unit =
scheduleOnce("poll", pollInterval)
}
logic
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment