Skip to content

Instantly share code, notes, and snippets.

@RobColeman
Last active January 29, 2016 01:06
Show Gist options
  • Save RobColeman/31fb62c2ea7910b29f19 to your computer and use it in GitHub Desktop.
Save RobColeman/31fb62c2ea7910b29f19 to your computer and use it in GitHub Desktop.
An Approximate Distribution wrapper class for TDunnings java TDigest library
package com.preact.platform.math.models
import java.lang.System._
import java.util
import com.tdunning.math.stats.{Centroid, TreeDigest}
import org.apache.commons.math3.distribution.NormalDistribution
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
import scala.collection.immutable.IndexedSeq
object ApproximateDistribution {
val defaultCompression = 100.0
val outputDistributionPoints = 100
def apply(compression: Double = defaultCompression): ApproximateDistribution = new ApproximateDistribution(new TreeDigest(compression))
def toApproximateDistribution(compression: Double = defaultCompression)(x: Double): ApproximateDistribution = {
val tD = ApproximateDistribution(compression)
tD.add(x)
tD
}
def toApproximateDistribution(compression: Double)(X: Array[Double]): ApproximateDistribution = {
val TD = ApproximateDistribution(compression)
TD
}
implicit class DoublesToApproximateDistribution(x: Double) {
def toApproximateDistribution(compression: Double = defaultCompression): ApproximateDistribution = {
val ap = ApproximateDistribution(compression)
ap.add(x)
ap
}
}
implicit class ArrayDoublesToApproximateDistribution(X: Array[Double]) {
def toApproximateDistribution(compression: Double = defaultCompression): ApproximateDistribution = {
val ap = ApproximateDistribution(compression)
X.foreach{x => ap.add(x)}
ap
}
}
}
class ApproximateDistribution(protected val digest: TreeDigest) {
var min: Double = Double.MaxValue
var max: Double = Double.MinValue
def size: Long = this.digest.size()
def updateMinMax(x: Double): Unit = {
this.min = Math.min(x, this.min)
this.max = Math.max(x, this.max)
}
def add(x: Double, w: Int = 1): Unit = {
this.updateMinMax(x)
this.digest.add(x, w)
}
def add(other: ApproximateDistribution): Unit = {
this.min = Math.min(other.min, this.min)
this.max = Math.max(other.max, this.max)
this.digest.add(other.digest)
}
def ++(other: ApproximateDistribution): ApproximateDistribution = {
this.add(other)
this
}
def cdf(x: Double): Double = this.digest.cdf(x)
def quantile(q: Double): Double = this.digest.quantile(q)
def centroids: util.Collection[Centroid] = this.digest.centroids()
def genCDF(points: Int = ApproximateDistribution.outputDistributionPoints,
// by default goes from min to max of seen data, but we can truncate it with these args
supportMin: Option[Double] = None,
supportMax: Option[Double] = None): Seq[(Double,Double)] = {
val mn = supportMin match {
case Some(sMn) => sMn
case None => this.min
}
val mx = supportMax match {
case Some(sMx) => sMx
case None => this.max
}
val stepSize: Double = (mx - mn) / points.toDouble
val support: Seq[Double] = (mn to mx by stepSize).toSeq
support.zip(support.map { x => this.digest.cdf(x) })
}
def genPDF(points: Int = ApproximateDistribution.outputDistributionPoints,
// by default goes from min to max of seen data, but we can truncate it with these args
supportMin: Option[Double] = None,
supportMax: Option[Double] = None): Seq[(Double,Double)] = {
// differentiate left
val cdf = this.genCDF(points = points, supportMin = supportMin, supportMax = supportMax)
val pdf: Seq[(Double, Double)] = cdf.head +: cdf.sliding(2).map{ s =>
val (l,r) = (s.head,s.last)
val x = r._1
val pdf = r._2 - l._2
(x, pdf)
}.toSeq
pdf
}
}
object ApproximateDistributionSparkKryoExample {
import ApproximateDistribution.{DoublesToApproximateDistribution,ArrayDoublesToApproximateDistribution}
def main(arg: Array[String]): Unit = {
val appName: String = "TDigest-Test"
val conf: SparkConf = new SparkConf().setAppName(appName).setMaster("local[16]")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[TreeDigest]))
val sc: SparkContext = new SparkContext(conf)
val trueDist0: NormalDistribution = new NormalDistribution(100.0, 15 )
val trueDist2: NormalDistribution = new NormalDistribution(100.0, 15 )
val compression = 300.0
val data: IndexedSeq[Double] = (0 until 100000).map{ i => trueDist0.sample()}
val TDlocal: ApproximateDistribution = ApproximateDistribution(compression)
data.foreach{x => TDlocal.add(x, 1)}
val startTime = currentTimeMillis()
val dataRDD: RDD[Double] = sc.parallelize(data)
val TDspark = dataRDD.map{ _.toApproximateDistribution() }.reduce( _ ++ _ )
val TDGlommedspark = dataRDD.glom().map{ _.toApproximateDistribution() }.reduce(_ ++ _)
val testX = 100.0
println("Running TreeDigest in spark using Kryo")
println(s"Took ${(currentTimeMillis() - startTime) / 1000d} seconds.")
println(s"From spark : ${TDspark.cdf(testX)}")
println(s"From spark glommed : ${TDGlommedspark.cdf(testX)}")
println(s"From local : ${TDlocal.cdf(testX)}")
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment