-
-
Save felixcheung/92ae74bc349ea83a9e29 to your computer and use it in GitHub Desktop.
/** | |
* compute percentile from an unsorted Spark RDD | |
* @param data: input data set of Long integers | |
* @param tile: percentile to compute (eg. 85 percentile) | |
* @return value of input data at the specified percentile | |
*/ | |
def computePercentile(data: RDD[Long], tile: Double): Double = { | |
// NIST method; data to be sorted in ascending order | |
val r = data.sortBy(x => x) | |
val c = r.count() | |
if (c == 1) r.first() | |
else { | |
val n = (tile / 100d) * (c + 1d) | |
val k = math.floor(n).toLong | |
val d = n - k | |
if (k <= 0) r.first() | |
else { | |
val index = r.zipWithIndex().map(_.swap) | |
val last = c | |
if (k >= c) { | |
index.lookup(last - 1).head | |
} else { | |
index.lookup(k - 1).head + d * (index.lookup(k).head - index.lookup(k - 1).head) | |
} | |
} | |
} | |
} | |
@turian hi, I believe there is still a error in the above.
Here is my updated code :
def computePercentile(vals: Seq[Double], tile: Double, unsorted: Boolean = true): Double = {
assert(tile >= 0 && tile <= 100)
if (vals.isEmpty) Double.NaN
else {
assert(vals.nonEmpty)
// Make sure the list is sorted, if that's what we've been told
if (!unsorted && vals.length >= 2) vals.sliding(2).foreach(l => assert(l(0) <= l(1))) else {}
// NIST method; data to be sorted in ascending order
val r =
if (unsorted) vals.sorted
else vals
val length = r.length
if (length == 1) r.head
else {
val n = (tile / 100d) * (length - 1)
val k = math.floor(n).toInt
val d = n - k
if (k <= 0 && d ==0 ) r.head // different here...
else {
val last = length
if (k + 1 >= length) {
r.last
} else {
r(k) + d * (r(k + 1) - r(k))
}
}
}
}
}
I believe there are some off by one errors in the above.
I found them using this style of test:
Here is my updated code (but for Scala Seqs, not RDDs) that works: