Skip to content

Instantly share code, notes, and snippets.

@dacr
Created January 21, 2015 23:03
Show Gist options
  • Save dacr/3af132b5ed23b47c00a2 to your computer and use it in GitHub Desktop.
Save dacr/3af132b5ed23b47c00a2 to your computer and use it in GitHub Desktop.
Akka-stream actors primes computation
class StreamBasedPrimesGenerator[NUM](
handler: CheckedValue[NUM] => Unit = StreamBasedPrimesGenerator.defaultHandlerFactory[NUM](),
name: String = "DefaultStreamBasedPrimesGeneratorSystem",
startFrom: NUM = 2,
primeNth: NUM = 1,
notPrimeNth: NUM = 1)(implicit numops: Integral[NUM]) extends PrimesDefinitions[NUM] with Shutdownable {
import numops._
implicit val system = ActorSystem(name)
import system.dispatcher
implicit val materializer = FlowMaterializer()
case class TestedValue(value:NUM) {
val state = isPrime(value)
val digitCount = value.toString.size
}
val materialized = FlowGraph { implicit builder =>
import FlowGraphImplicits._
val valueIterator = new NumericIterator(startFrom)
val values: Source[NUM] = Source(() => valueIterator)
val isPrimeNthIterator = new NumericIterator(primeNth)
val isPrimeNth = Source(() => isPrimeNthIterator)
val isNotPrimeNthIterator = new NumericIterator(notPrimeNth)
val isNotPrimeNth = Source(() => isNotPrimeNthIterator)
val testedValues = Flow[NUM].mapAsync(x=> Future{TestedValue(x) })
val checkedValues = Flow[(NUM, TestedValue)].map {
case (nth, tv) => CheckedValue[NUM](tv.value, tv.state, tv.digitCount, nth)
}
val onlyPrimes = Flow[TestedValue].filter(_.state)
val onlyNotPrimes = Flow[TestedValue].filter(! _.state)
val out = ForeachSink[CheckedValue[NUM]](handler)
val cast = Broadcast[TestedValue]
val merge = Merge[CheckedValue[NUM]]
val zipPrimeNth = Zip[NUM, TestedValue]
val zipNotPrimeNth= Zip[NUM, TestedValue]
isPrimeNth ~> zipPrimeNth.left
isNotPrimeNth ~> zipNotPrimeNth.left
values ~> testedValues ~> cast ~> onlyPrimes ~> zipPrimeNth.right
cast ~> onlyNotPrimes ~> zipNotPrimeNth.right
zipPrimeNth.out ~> checkedValues ~> merge
zipNotPrimeNth.out ~> checkedValues ~> merge
merge ~> out
}.run()
override def shutdown() { system.shutdown()}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment