Skip to content

Instantly share code, notes, and snippets.

@felixcheung
Last active March 28, 2018 08:26
Show Gist options
  • Save felixcheung/92ae74bc349ea83a9e29 to your computer and use it in GitHub Desktop.
Save felixcheung/92ae74bc349ea83a9e29 to your computer and use it in GitHub Desktop.
Spark compute percentile with RDD in Scala
/**
* compute percentile from an unsorted Spark RDD
* @param data: input data set of Long integers
* @param tile: percentile to compute (eg. 85 percentile)
* @return value of input data at the specified percentile
*/
def computePercentile(data: RDD[Long], tile: Double): Double = {
// NIST method; data to be sorted in ascending order
val r = data.sortBy(x => x)
val c = r.count()
if (c == 1) r.first()
else {
val n = (tile / 100d) * (c + 1d)
val k = math.floor(n).toLong
val d = n - k
if (k <= 0) r.first()
else {
val index = r.zipWithIndex().map(_.swap)
val last = c
if (k >= c) {
index.lookup(last - 1).head
} else {
index.lookup(k - 1).head + d * (index.lookup(k).head - index.lookup(k - 1).head)
}
}
}
}
@turian
Copy link

turian commented Nov 5, 2015

I believe there are some off by one errors in the above.

I found them using this style of test:

    val rng = new Random(0)
    val sorted: Seq[Double] = (0 to 100).map(_.toDouble)
    val unsorted: Seq[Double] = rng.shuffle(sorted)

    MathUtil.computePercentile(Seq(1.0), 000) shouldEqual 1.0 +- eps
    MathUtil.computePercentile(Seq(1.0), 100) shouldEqual 1.0 +- eps

    (0 to 100).foreach { i =>
      MathUtil.computePercentile(sorted, i, unsorted = false) shouldEqual i.toDouble +- eps
      MathUtil.computePercentile(unsorted, i, unsorted = true) shouldEqual i.toDouble +- eps
    }

Here is my updated code (but for Scala Seqs, not RDDs) that works:

  def computePercentile(vals: Seq[Double], tile: Double, unsorted: Boolean = true): Double = {
    assert(tile >= 0 && tile <= 100)
    if (vals.isEmpty) Double.NaN
    else {
      assert(vals.nonEmpty)
      // Make sure the list is sorted, if that's what we've been told
      if (!unsorted && vals.length >= 2) vals.sliding(2).foreach(l => assert(l(0) <= l(1))) else {}
      // NIST method; data to be sorted in ascending order
      val r =
        if (unsorted) vals.sorted
        else vals
      val length = r.length
      if (length == 1) r.head
      else {
        val n = (tile / 100d) * (length - 1)
        val k = math.floor(n).toInt
        val d = n - k
        if (k <= 0) r.head
        else {
          val last = length
          if (k + 1 >= length) {
            r.last
          } else {
            r(k) + d * (r(k + 1) - r(k))
          }
        }
      }
    }
  }

@chinesejie
Copy link

@turian hi, I believe there is still a error in the above.
Here is my updated code :
def computePercentile(vals: Seq[Double], tile: Double, unsorted: Boolean = true): Double = {
assert(tile >= 0 && tile <= 100)
if (vals.isEmpty) Double.NaN
else {
assert(vals.nonEmpty)
// Make sure the list is sorted, if that's what we've been told
if (!unsorted && vals.length >= 2) vals.sliding(2).foreach(l => assert(l(0) <= l(1))) else {}
// NIST method; data to be sorted in ascending order
val r =
if (unsorted) vals.sorted
else vals
val length = r.length
if (length == 1) r.head
else {
val n = (tile / 100d) * (length - 1)
val k = math.floor(n).toInt
val d = n - k
if (k <= 0 && d ==0 ) r.head // different here...
else {
val last = length
if (k + 1 >= length) {
r.last
} else {
r(k) + d * (r(k + 1) - r(k))
}
}
}
}
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment