Skip to content

Instantly share code, notes, and snippets.

@etaty
Created December 3, 2015 10:57
Show Gist options
  • Save etaty/2eb5848f9cc3b10c9a1f to your computer and use it in GitHub Desktop.
Save etaty/2eb5848f9cc3b10c9a1f to your computer and use it in GitHub Desktop.
akka stream recursive (Iteratee like) unfold unfoldM @see https://github.com/akka/akka/issues/19021
import akka.stream.{Attributes, Outlet, SourceShape}
import akka.stream.stage.{OutHandler, GraphStageLogic, GraphStage}
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Success, Failure, Try}
case class RecursiveSource[State, Out](
init: () => Future[(Option[State], Out)],
f: State => Future[(Option[State], Out)])(implicit exec: ExecutionContext) extends GraphStage[SourceShape[Out]] {
val out: Outlet[Out] = Outlet("RecursiveSource Out")
override val shape: SourceShape[Out] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
var current: () => Future[(Option[State], Out)] = init
val futureCB = getAsyncCallback { result: Try[(Option[State], Out)] =>
result match {
case Failure(ex) => failStage(ex)
case Success((state, elem)) =>
push(out, elem)
if (state.nonEmpty) {
current = { () =>
f(state.get)
}
} else {
complete(out)
}
}
}.invoke _
setHandler(out, new OutHandler {
override def onPull(): Unit = {
current().onComplete(futureCB)
}
})
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment