Skip to content

Instantly share code, notes, and snippets.

@ornicar
Created February 5, 2014 12:23
Show Gist options
  • Save ornicar/8822549 to your computer and use it in GitHub Desktop.
Save ornicar/8822549 to your computer and use it in GitHub Desktop.
package models.store.prismicDB
import scala.util.{ Try, Success, Failure }
import scala.concurrent.{ Future, ExecutionContext}
import scala.concurrent.ExecutionContext.Implicits.global
import akka.actor._
// This actor will perform asynchronous side effects sequentially.
// It ensures two effects are not performed at the same time, without blocking.
// This actor can be used to simulate database transactions.
trait SequentialActor extends Stash {
a:Actor =>
import SequentialActor._
type ReceiveAsync = PartialFunction[Any, Future[_]]
var currentMessage: Any = _
def process: ReceiveAsync
implicit val ex: ExecutionContext = new ExecutionContext {
def execute(runnable: Runnable): Unit = self ! Step(() => runnable.run(), currentMessage)
def reportFailure(t: Throwable): Unit = self ! Complete(Failure(t), currentMessage)
}
def idle: Receive = {
case msg if process.isDefinedAt(msg) => {
val s = self
context become busy
currentMessage = msg
process(msg).onComplete { r => s ! Complete(r, msg) }
}
}
def busy: Receive = {
case Complete(t, original) =>
currentMessage = null
unstashAll()
context become idle
t match {
case Success(_) =>
case Failure(e) =>
throw e
}
case Step(ex, original) => ex()
case msg ⇒ stash()
}
def receive = idle
}
object SequentialActor {
private case class Complete(t: Try[_], original: Any)
private case class Step(ex: () => Unit, original: Any)
}
trait ReactiveStart extends Stash {
a: Actor =>
case object ReactiveReady
case class CrashDuringStart(e: Throwable)
def reactivePreStart(): Future[Unit]
def receive1: Receive
override def preStart = {
val s = self
import scala.concurrent.ExecutionContext.Implicits.global
reactivePreStart() onComplete {
case Success(_) => self ! ReactiveReady
case Failure(e) => self ! CrashDuringStart(e)
}
}
override def receive = {
case ReactiveReady =>
unstashAll()
context.become(a.receive)
case CrashDuringStart(e) => {
context become {
case msg => sender ! Status.Failure(e)
}
unstashAll()
throw e
}
case message => stash()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment