Skip to content

Instantly share code, notes, and snippets.

@felixcheung
Last active March 28, 2018 08:26
Show Gist options
  • Save felixcheung/92ae74bc349ea83a9e29 to your computer and use it in GitHub Desktop.
Save felixcheung/92ae74bc349ea83a9e29 to your computer and use it in GitHub Desktop.
Spark compute percentile with RDD in Scala
/**
* compute percentile from an unsorted Spark RDD
* @param data: input data set of Long integers
* @param tile: percentile to compute (eg. 85 percentile)
* @return value of input data at the specified percentile
*/
def computePercentile(data: RDD[Long], tile: Double): Double = {
// NIST method; data to be sorted in ascending order
val r = data.sortBy(x => x)
val c = r.count()
if (c == 1) r.first()
else {
val n = (tile / 100d) * (c + 1d)
val k = math.floor(n).toLong
val d = n - k
if (k <= 0) r.first()
else {
val index = r.zipWithIndex().map(_.swap)
val last = c
if (k >= c) {
index.lookup(last - 1).head
} else {
index.lookup(k - 1).head + d * (index.lookup(k).head - index.lookup(k - 1).head)
}
}
}
}
@chinesejie
Copy link

@turian hi, I believe there is still a error in the above.
Here is my updated code :
def computePercentile(vals: Seq[Double], tile: Double, unsorted: Boolean = true): Double = {
assert(tile >= 0 && tile <= 100)
if (vals.isEmpty) Double.NaN
else {
assert(vals.nonEmpty)
// Make sure the list is sorted, if that's what we've been told
if (!unsorted && vals.length >= 2) vals.sliding(2).foreach(l => assert(l(0) <= l(1))) else {}
// NIST method; data to be sorted in ascending order
val r =
if (unsorted) vals.sorted
else vals
val length = r.length
if (length == 1) r.head
else {
val n = (tile / 100d) * (length - 1)
val k = math.floor(n).toInt
val d = n - k
if (k <= 0 && d ==0 ) r.head // different here...
else {
val last = length
if (k + 1 >= length) {
r.last
} else {
r(k) + d * (r(k + 1) - r(k))
}
}
}
}
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment