Skip to content

Instantly share code, notes, and snippets.

@sortega
Created August 14, 2015 16:02
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sortega/c6b561908f7540815e68 to your computer and use it in GitHub Desktop.
Save sortega/c6b561908f7540815e68 to your computer and use it in GitHub Desktop.
Parallel Eratosthenes sieve with back pressure
import akka.actor._
object BackPressureSieve extends App {
val DefaultChunkSize = 1024
val MinChunkSize = DefaultChunkSize * 3 / 4
val MaxNumber = 500000
case object GetCandidates
case class Candidates(values: Vector[Int])
case object NoMoreCandidates
class Generator extends Actor {
private var nextCandidate = 2
override def preStart(): Unit = {
context.actorOf(Props[Filter])
}
override def receive: Receive = {
case GetCandidates =>
val size = (MaxNumber - nextCandidate + 1) min DefaultChunkSize
sender() ! Candidates(Vector.iterate(nextCandidate, size)(_ + 1))
nextCandidate += size
if (nextCandidate > MaxNumber) {
context.become(exhausted)
}
}
private def exhausted: Receive = {
case GetCandidates => sender() ! NoMoreCandidates
}
}
class Filter extends Actor {
private var prime: Int = _
private var bufferedCandidates = Vector.empty[Int]
override def preStart(): Unit = {
pollForCandidates()
}
override def receive: Receive = {
case NoMoreCandidates => stop()
case Candidates(candidates) =>
prime = candidates.head
println(s"$prime found")
appendCandidates(candidates.tail)
extendChainOfFilters()
ensureEnoughCandidates()
}
private def stop(): Unit = {
println("No more candidates")
println(s"Spent just ${System.currentTimeMillis() - start} millis")
System.exit(0)
}
private def extendChainOfFilters(): Unit = {
context.actorOf(Props[Filter])
}
private def idle: Receive = {
case GetCandidates =>
sendCandidates(sender())
pollForCandidates()
context.become(filtering)
}
private def filtering: Receive = {
case GetCandidates => context.become(filteringFor(sender()))
case Candidates(candidates) =>
appendCandidates(candidates)
ensureEnoughCandidates()
case NoMoreCandidates => context.become(exhausted)
}
private def filteringFor(requester: ActorRef): Receive = {
case Candidates(candidates) =>
appendCandidates(candidates)
if (hasEnoughCandidates) {
sendCandidates(requester)
context.become(filtering)
}
pollForCandidates()
case NoMoreCandidates =>
if (bufferedCandidates.isEmpty) requester ! NoMoreCandidates
else sendCandidates(requester)
context.become(exhausted)
}
private def ensureEnoughCandidates(): Unit = {
if (hasEnoughCandidates) {
context.become(idle)
} else {
pollForCandidates()
context.become(filtering)
}
}
private def exhausted: Receive = {
case GetCandidates if bufferedCandidates.nonEmpty => sendCandidates(sender())
case GetCandidates => sender() ! NoMoreCandidates
}
private def appendCandidates(candidates: Vector[Int]): Unit = {
bufferedCandidates ++= candidates.filterNot(_ % prime == 0)
}
private def sendCandidates(to: ActorRef): Unit = {
to ! Candidates(bufferedCandidates)
bufferedCandidates = Vector.empty
}
private def hasEnoughCandidates: Boolean = {
bufferedCandidates.length >= MinChunkSize
}
private def pollForCandidates(): Unit = {
context.parent ! GetCandidates
}
}
val start = System.currentTimeMillis()
val system = ActorSystem("sieve")
val sieve = system.actorOf(Props[Generator])
system.awaitTermination()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment