Skip to content

Instantly share code, notes, and snippets.

@mjradwin
Last active August 29, 2015 14:24
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mjradwin/53b4268e333456c86b43 to your computer and use it in GitHub Desktop.
Save mjradwin/53b4268e333456c86b43 to your computer and use it in GitHub Desktop.
Winsorize an RDD of Doubles for Apache Spark
import org.apache.spark.rdd.RDD
def winsorize(data: RDD[Double], limits:(Double,Double)): RDD[Double] = {
val r = data.sortBy(x => x)
val c = r.count()
if (c <= 2) r
else {
val n0 = limits._1 * (c + 1d)
val n1 = limits._2 * (c + 1d)
val k0 = n0.toLong
val k1 = n1.toLong
val index = r.zipWithIndex().map(_.swap)
val max = if (k1 >= c) {
index.lookup(c - 1).head
} else {
index.lookup(k1 - 1).head
}
val min = if (k0 <= 0) {
r.first()
} else {
index.lookup(k0 - 1).head
}
r.map(x => if (x < min) {
min
} else if (x > max) {
max
} else {
x
}
)
}
}
@mjradwin
Copy link
Author

mjradwin commented Jul 6, 2015

Usage: val rdd1 = winsorize(rdd0, (0.02, 0.98))

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment