Skip to content

Instantly share code, notes, and snippets.

@nicerobot
Last active December 12, 2015 09:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save nicerobot/4751388 to your computer and use it in GitHub Desktop.
Save nicerobot/4751388 to your computer and use it in GitHub Desktop.
An Akka 2.1 simplified from the PiCalculation example.
curl -ks https://gist.github.com/nicerobot/4751388/raw/run.sh | sh
name := "Work"
version := "0.1"
scalaVersion := "2.10.0"
resolvers ++= Seq(
"Typesafe Artifactory Repository" at "http://repo.typesafe.com/typesafe/releases"
)
libraryDependencies ++= Seq(
"com.typesafe.akka" % "akka-actor_2.10" % "2.1.0",
"com.typesafe.akka" % "akka-remote_2.10" % "2.1.0",
"org.scalatest" % "scalatest_2.10" % "1.9.1" % "test"
)
scalacOptions ++= Seq("-unchecked", "-deprecation","-feature")
#!/bin/bash
[ -f Work.scala -a -f build.sbt ] || {
[ -d 4751388 ] || git clone git clone https://gist.github.com/4751388.git
cd 4751388 || exit ${LINENO}
}
git pull
sbt run
package org.nicerobot
import akka.actor._
import akka.routing.RoundRobinRouter
import scala.concurrent.duration._
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration._
object Work extends App {
val collection = 1 to 10000
val start = System.currentTimeMillis
val system = ActorSystem(f"Work${start}%08x${hashCode}%04x")
val master = system.actorOf(Props(new Master(collection.toStream)), name = "master")
implicit val timeout = Timeout(12.seconds)
val result = Await.result(master ? Run, timeout.duration)
println(s"\n${result}")
system.shutdown()
sealed trait WorkMessage
case object Run extends WorkMessage
case class Work(start: Int) extends WorkMessage
case class Result(value: Double) extends WorkMessage
sealed trait ControlMessage
case class Complete(elapsed:Long) extends ControlMessage
class Worker extends Actor {
println(s"Worker ${hashCode()}")
def doWork(element: Int): Double = {
print(s" ${element}")
return element.toDouble
}
def receive = {
case Work(element) ⇒
sender ! Result(doWork(element))
}
}
class Master(work: Seq[Int]) extends Actor {
val nrOfCPUs = Runtime.getRuntime().availableProcessors();
val nrOfWorkers = nrOfCPUs * 4
val workSize = work.size
var nrOfResults: Int = _
val workerRouter = context.actorOf(
Props[Worker].withRouter(RoundRobinRouter(nrOfWorkers)), name = "workerRouter")
var done: ActorRef = _
def receive = {
case Run ⇒
done = sender
work.foreach(element ⇒ workerRouter ! Work(element))
case Result(value) ⇒
nrOfResults += 1
if (nrOfResults >= workSize) {
done ! Complete(System.currentTimeMillis - start)
context.stop(self)
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment