public
Created

Some informal scalaz-stream performance testing code.

  • Download Gist
profile.scala
Scala
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102
package scalaz.stream
 
import scalaz.concurrent.Task
import scalaz.syntax.traverse._
import scalaz.std.list._
import Process.Process1
 
import collection.immutable.{IndexedSeq, Vector}
 
object profile extends App {
def time(block: => Any): Double = {
// for (i <- 1 to 1600) block // trigger JIT on block
var goodSample = false
var start = 0L; var stop = 0L;
var N = 3
while (!goodSample) {
start = System.currentTimeMillis
for (i <- 0 until N) block
stop = System.currentTimeMillis
if (stop - start < 5000) N = N*2 // require at least 5 seconds for a decent sample
else goodSample = true
}
val perOp = (stop.toDouble - start.toDouble) / N
perOp / 1000
}
 
def printTime(label: String)(block: => Any): Unit =
println(label + ": " + time(block))
 
val N = 10000
 
val nums: Array[Double] =
Array.tabulate(N)(_.toDouble)
 
val numsVec: Vector[Double] =
Vector(nums: _*)
 
val ranges: Vector[(Int,Int)] =
Vector() ++ (0 to N by 2000).sliding(2).map { case Seq(a,b) => (a,b) }
 
val chunkedNumsVec: Vector[(Array[Double], Int, Int)] =
ranges.map { case (start, stop) => (nums, start, stop) }
 
val slicedNumsVec: Vector[IndexedSeq[Double]] =
ranges.map { case (start, stop) => Slice(nums, start, stop) }
 
case class Slice[@specialized A](
overall: Array[A],
start: Int,
stop: Int) extends IndexedSeq[A] {
def apply(i: Int): A = overall(start + i)
def length = stop - start
}
 
def sumChunk(vs: IndexedSeq[Double]): Double = vs match {
case Slice(vs: Array[Double], i, j) => sum(vs, i, j)
case _ => sumOf(vs)
}
 
def chunkedReduce[A](v: Vector[IndexedSeq[A]])(f: IndexedSeq[A] => A): A =
f(v map f)
 
def sum(ds: Array[Double], start: Int, stop: Int): Double = {
var total = 0.0
var i: Int = 0
while (i < stop) { total += ds(i); i += 1 }
total
}
 
def sumOf(ds: TraversableOnce[Double]): Double = {
var total = 0.0
ds.foreach { d => total += d }
total
}
println("N: " + nums.length)
 
printTime("unboxed-in-memory") {
sum(nums, 0, nums.length)
}
printTime("boxed-in-memory") {
numsVec.sum
}
printTime("chunked-boxed-in-memory") {
sumOf(numsVec)
}
printTime("deep-chunked-boxed-in-memory") {
chunkedReduce(slicedNumsVec)(sumChunk)
}
printTime("naive-streaming-overhead") {
Process.fill(N)(1).run.run
}
 
printTime("chunked-streaming") {
Process.ranges(0, N, 20000).
map { case (i,j) => Slice(nums, i, j) }.
map(sumChunk).
pipe(process1.sum[Double]).
run.run
}
}

Please sign in to comment on this gist.

Something went wrong with that request. Please try again.