Last active
August 29, 2015 14:20
-
-
Save KarlSjostrand/d2e4e8b583c45191b8d0 to your computer and use it in GitHub Desktop.
Normally in Akka, an actor respond to a message by sending the result of a scala.concurrent.Future using the akka.pattern.pipe pattern, e.g. future.pipeTo(sender). In some cases, the process carried out by the Future changes state (be it local variables or database records), and to avoid concurrent modification the actor should not process new m…
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import scala.concurrent.ExecutionContext | |
import scala.concurrent.Future | |
import scala.language.implicitConversions | |
import scala.util.Failure | |
import scala.util.Success | |
import akka.actor.Actor | |
import akka.actor.ActorRef | |
import akka.actor.Stash | |
import akka.actor.Status | |
trait SequentialPipeToSupport { this: Actor with Stash => | |
case object ProcessingFinished | |
def gotoProcessingState() = context.become(processingState) | |
def markProcessingFinished() = self ! ProcessingFinished | |
def processingState: Receive = { | |
case ProcessingFinished => | |
unstashAll() | |
context.unbecome() | |
case _ => | |
stash() | |
} | |
final class SequentialPipeableFuture[T](val future: Future[T])(implicit executionContext: ExecutionContext) { | |
def pipeSequentiallyTo(recipient: ActorRef): Future[T] = { | |
gotoProcessingState() | |
future onComplete { | |
case Success(r) => | |
markProcessingFinished() | |
recipient ! r | |
case Failure(f) => | |
markProcessingFinished() | |
recipient ! Status.Failure(f) | |
} | |
future | |
} | |
} | |
implicit def pipeSequentially[T](future: Future[T])(implicit executionContext: ExecutionContext) = | |
new SequentialPipeableFuture(future) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment