Last active
July 2, 2017 09:32
-
-
Save aerohit/bfae36ab907343171d732b27805007de to your computer and use it in GitHub Desktop.
The Communicator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Taken from this blog post: | |
// http://ane.github.io/2016/10/14/communicator-functional-actors.html | |
import akka.actor.Actor | |
import akka.actor.ActorRef | |
import scala.concurrent.Future | |
trait Communicator[State, Input, Output] extends Actor { | |
/** This is the initial actor state */ | |
def initial: State | |
/** The state transition function */ | |
def handle(state: State, product: Output, origin: ActorRef): Unit | |
def receive = active(initial) | |
/** I/O handling which the deriving class must implement */ | |
def active(newState: State): Receive | |
} | |
trait StateMachine[Input, Output] { | |
/** The output processing function */ | |
def process(input: Input): Future[(Option[Output], StateMachine[Input, Output])] | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.ActorLogging | |
import akka.actor.ActorRef | |
import akka.actor.Props | |
import akka.actor.Stash | |
import communicators.MovieActor.MovieInput | |
import communicators.MovieActor.MovieOutput | |
import communicators.MovieActor.MovieState | |
import akka.pattern.pipe | |
import scala.concurrent.ExecutionContext | |
import scala.concurrent.Future | |
class MovieActor(getTitle: Int => Future[String]) | |
extends Communicator[MovieState, MovieInput, MovieOutput] | |
with Stash | |
with ActorLogging { | |
import context.dispatcher | |
override def initial: MovieState = MovieState(Map.empty, getTitle) | |
override def handle(state: MovieState, product: MovieOutput, origin: ActorRef): Unit = | |
origin ! product | |
override def active(state: MovieState): Receive = { | |
case input: MovieInput => | |
log.info(s"Received input $input") | |
state.process(input) pipeTo self | |
context.become(waitForOutput(sender()) orElse stashInput()) | |
} | |
private def waitForOutput(origin: ActorRef): Receive = { | |
case (output: Option[MovieOutput], newState: MovieState) => | |
log.info(s"Processed output $output and newState ${newState.movies}") | |
output.foreach(o => handle(newState, o, origin)) | |
context.become(active(newState)) | |
unstashAll() | |
} | |
private def stashInput(): Receive = { | |
case _: MovieInput => stash() | |
} | |
} | |
object MovieActor { | |
sealed trait MovieInput | |
case class RegisterMovie(id: Int) extends MovieInput | |
case object GetMovies extends MovieInput | |
sealed trait MovieOutput | |
case class Movies(map: Map[Int, String]) extends MovieOutput | |
case class MovieState(movies: Map[Int, String], getMovieTitle: Int => Future[String])(implicit ec: ExecutionContext) | |
extends StateMachine[MovieInput, MovieOutput] { | |
override def process(input: MovieInput): Future[(Option[MovieOutput], StateMachine[MovieInput, MovieOutput])] = | |
input match { | |
case RegisterMovie(id) => | |
getMovieTitle(id).map(title => (None, copy(movies = movies + (id -> title)))) | |
case GetMovies => | |
Future.successful((Option(Movies(movies)), this)) | |
} | |
} | |
def props(getTitle: Int => Future[String]) = Props(new MovieActor(getTitle)) | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor.ActorSystem | |
import akka.testkit._ | |
import communicators.MovieActor.GetMovies | |
import communicators.MovieActor.Movies | |
import communicators.MovieActor.RegisterMovie | |
import org.scalatest.BeforeAndAfterAll | |
import org.scalatest.FunSpecLike | |
import org.scalatest.Matchers | |
import scala.concurrent.Future | |
import scala.concurrent.Promise | |
class MovieActorSpec | |
extends TestKit(ActorSystem("test")) | |
with FunSpecLike | |
with Matchers | |
with ImplicitSender | |
with BeforeAndAfterAll { | |
override def afterAll { | |
TestKit.shutdownActorSystem(system) | |
} | |
it("the actor should respond with all the registered movies") { | |
val fstMovie = Promise[String]() | |
val sndMovie = Promise[String]() | |
def getTitle(id: Int): Future[String] = id match { | |
case 1 => fstMovie.future | |
case 2 => sndMovie.future | |
case _ => Future.failed(new RuntimeException("I know only two movies")) | |
} | |
val actor = system.actorOf(MovieActor.props(getTitle)) | |
actor ! RegisterMovie(1) | |
actor ! RegisterMovie(2) | |
fstMovie.success("The Godfather I") | |
sndMovie.success("The Godfather II") | |
actor ! GetMovies | |
expectMsg(Movies(Map(1 -> "The Godfather I", 2 -> "The Godfather II"))) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment