Skip to content

Instantly share code, notes, and snippets.

@hochgi
Created December 9, 2015 15:46
Show Gist options
  • Save hochgi/927d1ceab88c55fbb0f9 to your computer and use it in GitHub Desktop.
Save hochgi/927d1ceab88c55fbb0f9 to your computer and use it in GitHub Desktop.
unfold/unfoldAsync implementation using GraphStage in akka-stream
import akka.stream.stage.{OutHandler, GraphStageLogic, GraphStage}
import akka.stream.actor.ActorPublisherMessage.{Cancel, Request}
import akka.stream.scaladsl._
import akka.stream._
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
class Unfold[S, E](s: S,f: S => Option[(S, E)]) extends GraphStage[SourceShape[E]] {
val out: Outlet[E] = Outlet("Unfold")
override val shape: SourceShape[E] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) {
private var state = s
setHandler(out, new OutHandler {
override def onPull(): Unit = Try(f(state)) match {
case Failure(e) => fail(out, e)
case Success(None) => complete(out)
case Success(Some((newState, v))) => {
push(out, v)
state = newState
}
}
})
}
}
}
class UnfoldAsync[S, E](s: S,f: S ⇒ Future[Option[(S, E)]])(implicit ec: ExecutionContext) extends GraphStage[SourceShape[E]] {
val out: Outlet[E] = Outlet("UnfoldAsync")
override val shape: SourceShape[E] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = {
new GraphStageLogic(shape) {
private var state = s
private[this] val asyncHandler = getAsyncCallback[Try[Option[(S,E)]]]{
case Failure(ex) => fail(out,ex)
case Success(None) => complete(out)
case Success(Some((newS,elem))) => {
push(out,elem)
state = newS
}
}
setHandler(out, new OutHandler {
override def onPull(): Unit = Try(f(state)) match {
case Failure(err) => fail(out, err)
case Success(fut) => fut.onComplete(asyncHandler.invoke)(ec)
}
})
}
}
}
def unfold[S,E](s: S)(f: S => Option[(S,E)]): Source[E,Unit] =
Source.fromGraph(new Unfold(s,f))
def unfoldAsync[S,E](s: S)(g: S => Future[Option[(S,E)]])(implicit ec: ExecutionContext): Source[E,Unit] =
Source.fromGraph(new UnfoldAsync(s,g)(ec))
def unfoldInf[S,E](s: S)(f: S => (S,E)): Source[E,Unit] = {
Source.fromGraph(FlowGraph.create() { implicit b =>
import FlowGraph.Implicits._
val uzip = b.add(UnzipWith(f))
val cnct = b.add(Concat[S]())
val init = Source.single(s)
init ~> cnct ~> uzip.in
cnct <~ uzip.out0
SourceShape(uzip.out1)
})
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment