Created
August 14, 2015 16:02
-
-
Save sortega/c6b561908f7540815e68 to your computer and use it in GitHub Desktop.
Parallel Eratosthenes sieve with back pressure
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import akka.actor._ | |
object BackPressureSieve extends App { | |
val DefaultChunkSize = 1024 | |
val MinChunkSize = DefaultChunkSize * 3 / 4 | |
val MaxNumber = 500000 | |
case object GetCandidates | |
case class Candidates(values: Vector[Int]) | |
case object NoMoreCandidates | |
class Generator extends Actor { | |
private var nextCandidate = 2 | |
override def preStart(): Unit = { | |
context.actorOf(Props[Filter]) | |
} | |
override def receive: Receive = { | |
case GetCandidates => | |
val size = (MaxNumber - nextCandidate + 1) min DefaultChunkSize | |
sender() ! Candidates(Vector.iterate(nextCandidate, size)(_ + 1)) | |
nextCandidate += size | |
if (nextCandidate > MaxNumber) { | |
context.become(exhausted) | |
} | |
} | |
private def exhausted: Receive = { | |
case GetCandidates => sender() ! NoMoreCandidates | |
} | |
} | |
class Filter extends Actor { | |
private var prime: Int = _ | |
private var bufferedCandidates = Vector.empty[Int] | |
override def preStart(): Unit = { | |
pollForCandidates() | |
} | |
override def receive: Receive = { | |
case NoMoreCandidates => stop() | |
case Candidates(candidates) => | |
prime = candidates.head | |
println(s"$prime found") | |
appendCandidates(candidates.tail) | |
extendChainOfFilters() | |
ensureEnoughCandidates() | |
} | |
private def stop(): Unit = { | |
println("No more candidates") | |
println(s"Spent just ${System.currentTimeMillis() - start} millis") | |
System.exit(0) | |
} | |
private def extendChainOfFilters(): Unit = { | |
context.actorOf(Props[Filter]) | |
} | |
private def idle: Receive = { | |
case GetCandidates => | |
sendCandidates(sender()) | |
pollForCandidates() | |
context.become(filtering) | |
} | |
private def filtering: Receive = { | |
case GetCandidates => context.become(filteringFor(sender())) | |
case Candidates(candidates) => | |
appendCandidates(candidates) | |
ensureEnoughCandidates() | |
case NoMoreCandidates => context.become(exhausted) | |
} | |
private def filteringFor(requester: ActorRef): Receive = { | |
case Candidates(candidates) => | |
appendCandidates(candidates) | |
if (hasEnoughCandidates) { | |
sendCandidates(requester) | |
context.become(filtering) | |
} | |
pollForCandidates() | |
case NoMoreCandidates => | |
if (bufferedCandidates.isEmpty) requester ! NoMoreCandidates | |
else sendCandidates(requester) | |
context.become(exhausted) | |
} | |
private def ensureEnoughCandidates(): Unit = { | |
if (hasEnoughCandidates) { | |
context.become(idle) | |
} else { | |
pollForCandidates() | |
context.become(filtering) | |
} | |
} | |
private def exhausted: Receive = { | |
case GetCandidates if bufferedCandidates.nonEmpty => sendCandidates(sender()) | |
case GetCandidates => sender() ! NoMoreCandidates | |
} | |
private def appendCandidates(candidates: Vector[Int]): Unit = { | |
bufferedCandidates ++= candidates.filterNot(_ % prime == 0) | |
} | |
private def sendCandidates(to: ActorRef): Unit = { | |
to ! Candidates(bufferedCandidates) | |
bufferedCandidates = Vector.empty | |
} | |
private def hasEnoughCandidates: Boolean = { | |
bufferedCandidates.length >= MinChunkSize | |
} | |
private def pollForCandidates(): Unit = { | |
context.parent ! GetCandidates | |
} | |
} | |
val start = System.currentTimeMillis() | |
val system = ActorSystem("sieve") | |
val sieve = system.actorOf(Props[Generator]) | |
system.awaitTermination() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment