Skip to content

Instantly share code, notes, and snippets.

@geggo98
Last active August 29, 2015 13:56
Show Gist options
  • Save geggo98/8913772 to your computer and use it in GitHub Desktop.
Save geggo98/8913772 to your computer and use it in GitHub Desktop.
import akka.actor._
import akka.event.Logging
import akka.util.Timeout
import scala.concurrent.duration._
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.ExecutionContext.Implicits.global
import scala.reflect.ClassTag
/**
* Created by sschwets on 07.01.14.
*/
object Scribble extends App {
case class Calculate[T](values : Seq[T], index : Int, parallelLimit : Int, fn : (T,T) => T)
trait CalculateResponse[T]
case class CalculationResult[T](result : T, index : Int) extends CalculateResponse[T]
case object Busy extends CalculateResponse[Nothing]
class ParallelPrefixActor[T](implicit ev : ClassTag[T]) extends Actor {
val log = Logging(context.system, this)
val subCalculation = Props(classOf[ParallelPrefixActor[T]], ev)
def receive = waitForCalculation
def waitForCalculation : Actor.Receive = {
case c : Calculate[T] =>
log.debug(s"Start calculation for ${c.values.length} values, segment nr. ${c.index}, from ${c.values.head} to ${c.values.last}")
if (c.values.length < c.parallelLimit) {
log.debug("Calculating result direct")
val result = c.values.reduceLeft(c.fn)
sender ! CalculationResult(result, c.index)
}else{
val groupSize: Int = Math.max(1, (c.values.length / c.parallelLimit) + Math.min(c.values.length % c.parallelLimit, 1))
log.debug(s"Splitting calculation for ${c.values.length} values up to ${c.parallelLimit} children, ${groupSize} elements each, limit ${c.parallelLimit}")
def segments=c.values.grouped(groupSize) // Type iterator should always be declared as def instead of val to avoid bugs with consumed iterators
log.debug("Starting children")
segments.zipWithIndex.foreach{case (values, index) =>
context.actorOf(subCalculation) ! c.copy(values = values, index = index)
}
val partialResults: Array[T] = Array.ofDim[T](segments.length)
log.debug(s"Waiting for ${partialResults.length} results (${partialResults.indices})")
context.become(waitForResults(segments.length, partialResults, c, sender), discardOld = true)
}
}
def waitForResults(outstandingResults : Int, partialResults : Array[T], originalRequest : Calculate[T], originalSender : ActorRef) : Actor.Receive = {
case c : Calculate[_] => sender ! Busy
case r : CalculationResult[T] =>
log.debug(s"Putting result ${r.result} on position ${r.index} in ${partialResults.length}")
val updatedResults = partialResults.updated(r.index, r.result)
log.debug("Killing sub-worker")
sender ! PoisonPill
if (outstandingResults==1) {
log.debug("Calculating result from partial results")
val result = updatedResults.reduceLeft(originalRequest.fn)
originalSender ! CalculationResult(result, originalRequest.index)
context.become(waitForCalculation, discardOld = true)
}else{
log.debug(s"Still waiting for ${outstandingResults-1} results")
// For fanOut > 2 one could here already combine consecutive partial results
context.become(waitForResults(outstandingResults-1, updatedResults, originalRequest, originalSender), discardOld = true)
}
}
}
class LoggingActor extends Actor {
val log = Logging(context.system, this)
def receive : Actor.Receive = {
case any => log.info(s"""Received "${any.toString}".""")
}
}
// Setup the actor system
val system = ActorSystem("root")
// Start one calculation actor
val calculationProps = Props(classOf[ParallelPrefixActor[BigInt]], ClassTag(classOf[BigInt]))
val loggerProps = Props(classOf[LoggingActor])
val calculateActor = system.actorOf(calculationProps, "Calcolon-BigInt")
val inbox = Inbox.create(system)
val timeOut = FiniteDuration(10, TimeUnit.SECONDS)
// Helper function to measure time
def time[A] (id : String)(f: => A) = {
val start = System.nanoTime()
val result = f
val stop = System.nanoTime()
println(s"""Time for "${id}": ${(stop-start)*1e-6d}ms""")
result
}
// Test code
val parallelLimit = 500
val limit = 30000
def testRange = (1 to limit).map(BigInt(_))
time("par product")(testRange.par.product) // 1446.251822ms
time("actor product"){ // 1407.4532849999998ms
inbox.send(calculateActor, Calculate[BigInt](testRange, 0, parallelLimit, _ * _))
inbox.receive(timeOut)
}
time("par sum")(testRange.par.sum) // 19.728831ms
time("actor sum"){ // 142.973606ms
inbox.send(calculateActor, Calculate[BigInt](testRange, 0, parallelLimit, _ + _))
inbox.receive(timeOut)
}
// Alternative Test
import akka.pattern.{ ask, pipe }
implicit val timeout = Timeout(30 seconds)
val futureResult=calculateActor ? Calculate[BigInt](testRange, 0, parallelLimit, _ * _)
val futureNumberOfDigits=futureResult.mapTo[CalculateResponse[BigInt]].
collect{case CalculationResult(value, _) => value}.map(_.toString.length).filter(_>10)
futureNumberOfDigits.onSuccess{
case digits : Int => println(s"Anzahl der Stellen: $digits")
}
val loggerActor=system.actorOf(loggerProps)
val futurePipe=futureNumberOfDigits pipeTo loggerActor
// Erster blockierender Aufruf
import scala.concurrent.Await
Await.result(futurePipe, atMost=timeout.duration)
// Shutdown
Thread.sleep(5000)
system.shutdown()
System.exit(0)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment