Skip to content

Instantly share code, notes, and snippets.

@clayrat
Created February 26, 2016 17:18
Show Gist options
  • Save clayrat/ffa33efd483ceef12e13 to your computer and use it in GitHub Desktop.
Save clayrat/ffa33efd483ceef12e13 to your computer and use it in GitHub Desktop.
import scala.concurrent.Await
import scala.concurrent.duration._
import monifu.reactive.Observable
import monifu.concurrent.Scheduler
class Monix {
val s1 = Scheduler.singleThread("test")
val s2 = Scheduler.computation(Runtime.getRuntime.availableProcessors())
val s3 = Scheduler.io()
def Rx() {
// Warm up to get more consistent results when timing.
(0 to 5).foreach(_ =>
Seq(s1, s2, s3).foreach(s =>
block(skynet(), s)
)
)
}
def block(o: Observable[Long], scheduler: Scheduler): Long =
Await.result(o.asFuture(scheduler), 1.minutes).getOrElse(-1L)
def run() {
run1("singleThread", s1)
run1("computation", s2)
run1("io", s3)
}
def run1(name: String, scheduler: Scheduler) =
Skynet.time(name,
block(skynet().head, scheduler)
)
def skynet(): Observable[Long] = {
def go(num: Long, size: Int, div: Int): Observable[Long] =
if (size == 1)
Observable(num)
else
Observable
.range(0, div)
.flatMap { i =>
val subNum = num + i * (size / div)
go(subNum, size / div, div)
}.reduce(_ + _)
go(0, 1000000, 10)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment