Skip to content

Instantly share code, notes, and snippets.

@aerohit
Last active July 2, 2017 09:32
Show Gist options
  • Save aerohit/bfae36ab907343171d732b27805007de to your computer and use it in GitHub Desktop.
Save aerohit/bfae36ab907343171d732b27805007de to your computer and use it in GitHub Desktop.
The Communicator
// Taken from this blog post:
// http://ane.github.io/2016/10/14/communicator-functional-actors.html
import akka.actor.Actor
import akka.actor.ActorRef
import scala.concurrent.Future
trait Communicator[State, Input, Output] extends Actor {
/** This is the initial actor state */
def initial: State
/** The state transition function */
def handle(state: State, product: Output, origin: ActorRef): Unit
def receive = active(initial)
/** I/O handling which the deriving class must implement */
def active(newState: State): Receive
}
trait StateMachine[Input, Output] {
/** The output processing function */
def process(input: Input): Future[(Option[Output], StateMachine[Input, Output])]
}
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.Stash
import communicators.MovieActor.MovieInput
import communicators.MovieActor.MovieOutput
import communicators.MovieActor.MovieState
import akka.pattern.pipe
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
class MovieActor(getTitle: Int => Future[String])
extends Communicator[MovieState, MovieInput, MovieOutput]
with Stash
with ActorLogging {
import context.dispatcher
override def initial: MovieState = MovieState(Map.empty, getTitle)
override def handle(state: MovieState, product: MovieOutput, origin: ActorRef): Unit =
origin ! product
override def active(state: MovieState): Receive = {
case input: MovieInput =>
log.info(s"Received input $input")
state.process(input) pipeTo self
context.become(waitForOutput(sender()) orElse stashInput())
}
private def waitForOutput(origin: ActorRef): Receive = {
case (output: Option[MovieOutput], newState: MovieState) =>
log.info(s"Processed output $output and newState ${newState.movies}")
output.foreach(o => handle(newState, o, origin))
context.become(active(newState))
unstashAll()
}
private def stashInput(): Receive = {
case _: MovieInput => stash()
}
}
object MovieActor {
sealed trait MovieInput
case class RegisterMovie(id: Int) extends MovieInput
case object GetMovies extends MovieInput
sealed trait MovieOutput
case class Movies(map: Map[Int, String]) extends MovieOutput
case class MovieState(movies: Map[Int, String], getMovieTitle: Int => Future[String])(implicit ec: ExecutionContext)
extends StateMachine[MovieInput, MovieOutput] {
override def process(input: MovieInput): Future[(Option[MovieOutput], StateMachine[MovieInput, MovieOutput])] =
input match {
case RegisterMovie(id) =>
getMovieTitle(id).map(title => (None, copy(movies = movies + (id -> title))))
case GetMovies =>
Future.successful((Option(Movies(movies)), this))
}
}
def props(getTitle: Int => Future[String]) = Props(new MovieActor(getTitle))
}
import akka.actor.ActorSystem
import akka.testkit._
import communicators.MovieActor.GetMovies
import communicators.MovieActor.Movies
import communicators.MovieActor.RegisterMovie
import org.scalatest.BeforeAndAfterAll
import org.scalatest.FunSpecLike
import org.scalatest.Matchers
import scala.concurrent.Future
import scala.concurrent.Promise
class MovieActorSpec
extends TestKit(ActorSystem("test"))
with FunSpecLike
with Matchers
with ImplicitSender
with BeforeAndAfterAll {
override def afterAll {
TestKit.shutdownActorSystem(system)
}
it("the actor should respond with all the registered movies") {
val fstMovie = Promise[String]()
val sndMovie = Promise[String]()
def getTitle(id: Int): Future[String] = id match {
case 1 => fstMovie.future
case 2 => sndMovie.future
case _ => Future.failed(new RuntimeException("I know only two movies"))
}
val actor = system.actorOf(MovieActor.props(getTitle))
actor ! RegisterMovie(1)
actor ! RegisterMovie(2)
fstMovie.success("The Godfather I")
sndMovie.success("The Godfather II")
actor ! GetMovies
expectMsg(Movies(Map(1 -> "The Godfather I", 2 -> "The Godfather II")))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment