Skip to content

Instantly share code, notes, and snippets.

@Leammas
Created January 20, 2017 13:20
Show Gist options
  • Save Leammas/becbf240589565ff0ba038492fe7cbd1 to your computer and use it in GitHub Desktop.
Save Leammas/becbf240589565ff0ba038492fe7cbd1 to your computer and use it in GitHub Desktop.
import akka.util.Timeout
import scala.concurrent.duration._
import scala.concurrent.{Await, Future}
object Foo extends App {
Classic.run()
Typed.run()
}
object Typed {
import Typed.Manager.{Done, Payload, Start}
import Typed.Worker.Work
import akka.typed.AskPattern._
import akka.typed.ScalaDSL._
import akka.typed._
object Worker {
case class Work(array: Array[Int], replyTo: ActorRef[Done])
}
object Manager {
sealed trait Payload
case class Done(sum: Int) extends Payload
case class Start(array: Array[Int], replyTo: ActorRef[Done]) extends Payload
}
val worker = Static[Work] { msg =>
msg.replyTo ! Done(msg.array.sum)
Stopped
}
val manager = ContextAware[Payload] { ctx =>
val numWorkers = 4
val workRange = 1 to numWorkers
val workers = workRange.map(x => ctx.spawn(worker, s"worker$x"))
var sum = 0
var awaiting = numWorkers
var replyTo: Option[ActorRef[Done]] = None
Static[Payload] {
case Start(array, rTo) =>
replyTo = Some(rTo)
for {
x <- workRange
slice = array.slice(array.length * (x - 1), array.length * x)
} yield workers(x - 1) ! Work(slice, ctx.self)
case Done(s) =>
sum = s + sum
if (awaiting - 1 > 0) {
awaiting = awaiting - 1
} else {
replyTo.foreach(_ ! Done(sum))
Stopped
}
}
}
def run() = {
implicit val to = Timeout(10 seconds)
val nums = (1 to 100).toArray
val system = ActorSystem("Manager", manager)
implicit val sch = system.scheduler
val reply: Future[Done] = system ? (Start(nums, _)) // type must be explicit
val res = Await.result(reply, 10 seconds)
println(s"Typed done ${res.sum}")
Await.result(system.terminate(), 1.second)
}
}
object Classic {
import Classic.Manager.Start
import Classic.Worker.{Done, Work}
import akka.actor.{Actor, ActorRef, ActorSystem, PoisonPill, Props}
import akka.pattern._
object Worker {
case class Work(array: Array[Int])
case class Done(sum: Int)
}
class Worker extends Actor {
def receive: Receive = {
case Work(array) =>
sender() ! Done(array.sum)
self ! PoisonPill
}
}
object Manager {
case class Start(array: Array[Int])
}
class Manager extends Actor {
val numWorkers = 4
val workRange = 1 to numWorkers
val workers = workRange.map(x => context.actorOf(Props[Worker], s"worker$x"))
var sum = 0
def receive: Receive = {
case Start(array) =>
for {
x <- workRange
slice = array.slice(array.length * (x - 1), array.length * x)
} yield workers(x - 1) ! Work(slice)
context.become(working(0, numWorkers, sender()))
}
def working(sum: Int, awaiting: Int, replyTo: ActorRef): Receive = {
case Done(s) =>
val ssum = s + sum
if (awaiting - 1 > 0) {
context.become(working(ssum, awaiting - 1, replyTo))
} else {
replyTo ! Done(ssum)
self ! PoisonPill
}
}
}
def run() = {
implicit val to = Timeout(10 seconds)
val nums = (1 to 100).toArray
val system = ActorSystem("foo")
val mgr = system.actorOf(Props[Manager])
val res = Await.result((mgr ? Start(nums)).mapTo[Done], 10 seconds)
println(s"Classic done ${res.sum}")
Await.ready(system.terminate(), 10 seconds)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment