Skip to content

Instantly share code, notes, and snippets.

@hochgi
Last active December 1, 2015 12:38
Show Gist options
  • Save hochgi/cbe5ffc6cf2915e31091 to your computer and use it in GitHub Desktop.
Save hochgi/cbe5ffc6cf2915e31091 to your computer and use it in GitHub Desktop.
akka-stream-unfold
import akka.actor._
import akka.stream.actor.ActorPublisher
import akka.stream.actor.ActorPublisherMessage.{Cancel, Request}
import akka.stream.scaladsl.Source
import scala.concurrent.{ExecutionContext, Future}
import scala.util.{Failure, Success, Try}
def unfold[S,E](s: S)
(f: S => Option[(S,E)]): Source[E,ActorRef] =
Source.actorPublisher[E](Props(classOf[UnfoldPublisher[S,E]],s,f))
def unfoldM[S,E](s: S)
(f: S => Future[Option[(S,E)]])
(implicit ec: ExecutionContext): Source[E,ActorRef] =
Source.actorPublisher[E](Props(classOf[UnfoldPublisherM[S,E]],s,f,ec))
class UnfoldPublisher[S,E](initialState: S, f: S => Option[(S,E)]) extends ActorPublisher[E] {
override def receive: Actor.Receive = stateReceive(initialState)
@tailrec
private[this] def execute(s: S): S =
if(totalDemand == 0) s
else Try(f(s)) match {
case Failure(err) => onErrorThenStop(err); s
case Success(None) => onCompleteThenStop(); s
case Success(Some((newState,elem))) =>
onNext(elem)
execute(newState)
}
def stateReceive(s: S): Receive = {
case Cancel => context.stop(self)
case _: Request => context.become(stateReceive(execute(s)))
}
}
class UnfoldPublisherM[S,E](initialState: S, f: S => Future[Option[(S,E)]], ec: ExecutionContext) extends ActorPublisher[E] {
case class Next(state: S)
private[this] def execute(state: S) = Try(f(state)) match {
case Failure(error) => onErrorThenStop(error)
case Success(future) => future.onComplete {
case Failure(error) => onErrorThenStop(error)
case Success(None) => onCompleteThenStop()
case Success(Some((newState, elem))) =>
onNext(elem)
self ! Next(newState)
}(ec)
}
override def receive: Actor.Receive = stateReceive(initialState,idle = true)
def stateReceive(s: S, idle: Boolean): Receive = {
case Cancel => context.stop(self)
case Next(newState) if totalDemand > 0L => execute(newState)
case Next(newState) => context.become(stateReceive(newState,idle = true))
case Request(n) if idle =>
execute(s)
context.become(stateReceive(s,idle = false))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment