Skip to content

Instantly share code, notes, and snippets.

@jkpl
Created August 6, 2015 08:22
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jkpl/b3c76439ce822cb9757c to your computer and use it in GitHub Desktop.
Save jkpl/b3c76439ce822cb9757c to your computer and use it in GitHub Desktop.
Sequence of Akka actors
package shardaus
import akka.actor._
import scalaz._
import Scalaz._
object ActorSequence {
type SequenceProp = ActorRef => Props
def apply(actorSystem: ActorSystem,
finalActor: ActorRef,
sequenceProps: Seq[ActorSequence.SequenceProp]) = {
new ActorSequence(actorSystem, finalActor, sequenceProps)
}
}
class ActorSequence(actorSystem: ActorSystem,
finalActor: ActorRef,
sequenceProps: Seq[ActorSequence.SequenceProp]) {
val (entryPoint, actors) = sequenceProps.foldRight((finalActor, Seq.empty[ActorRef])) {
case (prop, (prev, seq)) =>
val next = actorSystem.actorOf(prop(prev))
(next, next +: seq)
}
}
object SimpleActor {
import shardaus.ActorSequence.SequenceProp
type PF = PartialFunction[Any, \/[Any, Any]]
def props(nextActor: ActorRef)(process: PF): Props = {
Props(new SimpleActor(nextActor, process))
}
def sequenceProp(process: PF): SequenceProp =
(nextActor: ActorRef) => props(nextActor)(process)
}
class SimpleActor(nextActor: ActorRef, process: SimpleActor.PF) extends Actor with ActorLogging {
override def receive: Receive = {
case m =>
log.info("Got message in {}", self.path.name)
process.orElse(fallthrough)(m).fold(
sender() ! _,
nextActor.forward _
)
}
private def fallthrough: SimpleActor.PF = {
case _ => \/ left 'unknownMessage
}
}
class ExampleActor extends Actor with ActorLogging {
val convert2string = SimpleActor.sequenceProp {
case m => m.toString.right
}
val quote = SimpleActor.sequenceProp {
case m: String => s""""$m"""".right
}
val format = SimpleActor.sequenceProp {
case m: String => s"Got message: $m".right
}
val wrap = SimpleActor.sequenceProp {
case m: String => ('done, m).right
}
val actorSequence = ActorSequence(context.system, self, Seq(convert2string, quote, format, wrap))
override def receive: Receive = {
case ('done, m: String) => log.info(m)
case m => actorSequence.entryPoint ! m
}
}
object ExampleApp {
val messages = List(
"foo", "bar", 1, 4, "ohai"
)
def main(args: Array[String]): Unit = {
val as = ActorSystem("Example")
val example = as.actorOf(Props[ExampleActor])
messages foreach { example ! _ }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment