Skip to content

Instantly share code, notes, and snippets.

@atla
Created February 17, 2010 17:43
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save atla/306844 to your computer and use it in GitHub Desktop.
Save atla/306844 to your computer and use it in GitHub Desktop.
package iwsexample
import scala.actors._
import scala.actors.Actor._
import scala.actors.Futures._
import scala.collection.mutable._
case class RegisterWorker (worker:Worker)
case class UnregisterWorker (worker:Worker)
case class SortList (data:List[Int])
case class SortListResponse (data:List[Int])
case class Stop
object MergeSortHelper {
def merge(aList:List[Int], bList:List[Int]) : List[Int]= bList match {
case Nil => aList
case _ =>
aList match {
case Nil => bList
case x::xs => if (x < bList.head) x::merge(xs, bList) else bList.head::merge(aList, bList.tail)
}
}
def mergeAll (lstA : List[Int], lstBs : List[List[Int]]) : List[Int] = lstBs match {
case hd :: tl => mergeAll (merge (lstA, hd), tl)
case Nil => lstA
}
}
object MasterHelper {
def distributeWork(workers:List[Worker], data:List[Int], partSlice:Int) : List[Future[Any]] = workers match {
case hd :: tl => {
val (piece, rest) = data.splitAt (partSlice)
val future = hd !! SortList (piece)
future :: distributeWork (tl, rest, partSlice)
}
case _ => Nil
}
}
class Worker extends Actor{
def act = loop {
react {
case SortList (data) => reply (SortListResponse (data.sort {(a,b) => a < b}))
case Stop => Master ! UnregisterWorker (this)
case _ => 0
}
}
override def start = {
Master ! RegisterWorker (this)
super.start ()
}
}
object Master extends Actor {
private var workers = List [Worker]()
def act = loop {
react {
case RegisterWorker (newWorker) => workers = newWorker :: workers
case UnregisterWorker (worker) => workers = workers - worker
case SortList (data:List[Int]) => {
val futures = MasterHelper.distributeWork (workers, data, data.size/workers.size)
while (futures.count(_.isSet == false) > 0){}
val responses = futures.map ( x => {
val res = x ()
(res.asInstanceOf[SortListResponse]).data
})
reply (SortListResponse (MergeSortHelper.mergeAll (responses.head, responses.tail)))
}
case Stop => System.exit (0)
}
}
}
object Main {
def main(args: Array[String]) :Unit = {
// create 10 workers
val workers = for (i <- 1 to 10) yield new Worker ()
workers.foreach { _.start }
val rnd = new Random
val lst = new ListBuffer[Int] ()
for (i <- 0 to 5000)
lst.append (rnd.nextInt(5000))
Master.start
var avg : Long = 0
for (i <- 1 to 10000){
var startTime = System.currentTimeMillis
Master !? SortList (lst.toList) match {
case SortListResponse(list) => {
avg = avg + (System.currentTimeMillis-startTime).toLong
}
}
}
println ("The average runTime with 10.000 iterations to sort 5000 Int numbers and " + workers.size + " worker(s) was: " + avg/10000.0f + "milliseconds")
workers.foreach { _ ! Stop }
Master ! Stop
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment