Skip to content

Instantly share code, notes, and snippets.

@KarlSjostrand
Last active August 29, 2015 14:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save KarlSjostrand/d2e4e8b583c45191b8d0 to your computer and use it in GitHub Desktop.
Save KarlSjostrand/d2e4e8b583c45191b8d0 to your computer and use it in GitHub Desktop.
Normally in Akka, an actor respond to a message by sending the result of a scala.concurrent.Future using the akka.pattern.pipe pattern, e.g. future.pipeTo(sender). In some cases, the process carried out by the Future changes state (be it local variables or database records), and to avoid concurrent modification the actor should not process new m…
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.language.implicitConversions
import scala.util.Failure
import scala.util.Success
import akka.actor.Actor
import akka.actor.ActorRef
import akka.actor.Stash
import akka.actor.Status
trait SequentialPipeToSupport { this: Actor with Stash =>
case object ProcessingFinished
def gotoProcessingState() = context.become(processingState)
def markProcessingFinished() = self ! ProcessingFinished
def processingState: Receive = {
case ProcessingFinished =>
unstashAll()
context.unbecome()
case _ =>
stash()
}
final class SequentialPipeableFuture[T](val future: Future[T])(implicit executionContext: ExecutionContext) {
def pipeSequentiallyTo(recipient: ActorRef): Future[T] = {
gotoProcessingState()
future onComplete {
case Success(r) =>
markProcessingFinished()
recipient ! r
case Failure(f) =>
markProcessingFinished()
recipient ! Status.Failure(f)
}
future
}
}
implicit def pipeSequentially[T](future: Future[T])(implicit executionContext: ExecutionContext) =
new SequentialPipeableFuture(future)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment