Skip to content

Instantly share code, notes, and snippets.

@RobColeman
Last active March 26, 2019 16:09
Show Gist options
  • Save RobColeman/7a5ebcb7c155c94b0a62 to your computer and use it in GitHub Desktop.
Save RobColeman/7a5ebcb7c155c94b0a62 to your computer and use it in GitHub Desktop.
Helpers for TDunnings Java TDigest library
package com.preact.platform.math.models
import java.lang.System._
import java.nio.ByteBuffer
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.commons.math3.distribution.ExponentialDistribution
import org.apache.commons.math3.distribution.NormalDistribution
import com.tdunning.math.stats.TreeDigest
import scala.collection.immutable.IndexedSeq
object TreeDigestHelper {
def apply(compression: Double): TreeDigest = new TreeDigest(compression)
def serialize(tdigest: TreeDigest): Array[Byte] = {
val arr = new Array[Byte](tdigest.byteSize)
tdigest.asBytes(ByteBuffer.wrap(arr))
arr
}
def deserialize(arr: Array[Byte]): TreeDigest = {
TreeDigest.fromBytes(ByteBuffer.wrap(arr))
}
def reduceMergeDigests(lBytes: Array[Byte], rBytes: Array[Byte]): Array[Byte] = {
val lTD = this.deserialize(lBytes)
val rTD = this.deserialize(rBytes)
lTD.add(rTD)
this.serialize(lTD)
}
def mapToDigest(compression: Double)(x: Double): Array[Byte] = {
val TD = new TreeDigest(compression)
TD.add(x, 1)
this.serialize(TD)
}
def mapArrayToDigest(compression: Double)(X: Seq[Double]): Array[Byte] = {
val TD = new TreeDigest(compression)
X.foreach{ x => TD.add(x, 1) }
this.serialize(TD)
}
def genCDF(supportMin: Double, supportMax: Double, points: Int = 100)(digest: TreeDigest): Seq[(Double,Double)] = {
val stepSize = (supportMax - supportMin) / points.toFloat
val support = (supportMin to supportMax by stepSize).toSeq
support.zip(support.map { x => digest.cdf(x) })
}
def genPDF(supportMin: Double, supportMax: Double, points: Int = 100)(digest: TreeDigest): Seq[(Double,Double)] = {
val cdf = this.genCDF(supportMin, supportMax, points)(digest)
val pdf: Seq[(Double, Double)] = cdf.head +: cdf.sliding(2).map{ s =>
val (l,r) = (s.head,s.last)
val x = r._1
val pdf = r._2 - l._2
(x, pdf)
}.toSeq
pdf
}
}
object TDigestExample {
def main(arg: Array[String]): Unit = {
val trueDist0: ExponentialDistribution = new ExponentialDistribution(15)
val trueDist1: ExponentialDistribution = new ExponentialDistribution(30)
val TD0: TreeDigest = new TreeDigest(25.0)
(0 until 10000).map{ i => trueDist0.sample()}.foreach{ x => TD0.add(x,1) }
val b: Array[Byte] = TreeDigestHelper.serialize(TD0)
val TD00: TreeDigest = TreeDigestHelper.deserialize(b)
println(TD0.cdf(15.0))
println(TD00.cdf(15.0))
}
}
object TDigestSparkExample {
def main(arg: Array[String]): Unit = {
val appName: String = "TDigest-Test"
val conf: SparkConf = new SparkConf().setAppName(appName).setMaster("local[16]")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[TreeDigest]))
val sc: SparkContext = new SparkContext(conf)
val trueDist0: ExponentialDistribution = new ExponentialDistribution(15)
val compression = 25.0
val data = (0 until 100000).map{ i => trueDist0.sample()}
val TDlocal = new TreeDigest(compression)
data.foreach{x => TDlocal.add(x, 1)}
val startTime = currentTimeMillis()
val dataRDD: RDD[Double] = sc.parallelize(data)
val tdBytes: Array[Byte] = dataRDD.map(TreeDigestHelper.mapToDigest(compression))
.reduce(TreeDigestHelper.reduceMergeDigests)
val TDspark = TreeDigestHelper.deserialize(tdBytes)
println(s"Took ${(currentTimeMillis() - startTime) / 1000d} seconds.")
println(s"From spark : ${TDspark.cdf(15.0)}")
println(s"From local : ${TDlocal.cdf(15.0)}")
}
}
object TDigestSparkKryoExample {
def main(arg: Array[String]): Unit = {
val appName: String = "TDigest-Test"
val conf: SparkConf = new SparkConf().setAppName(appName).setMaster("local[16]")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array(classOf[TreeDigest]))
val sc: SparkContext = new SparkContext(conf)
val trueDist0: NormalDistribution = new NormalDistribution(100.0, 15 )
val trueDist2: NormalDistribution = new NormalDistribution(100.0, 15 )
val compression = 300.0
val data: IndexedSeq[Double] = (0 until 100000).map{ i => trueDist0.sample()}
val TDlocal = new TreeDigest(compression)
data.foreach{x => TDlocal.add(x, 1)}
val startTime = currentTimeMillis()
val dataRDD: RDD[Double] = sc.parallelize(data)
val TDspark = dataRDD.map(toTD(compression)).reduce(reduceTD)
val TDGlommedspark = dataRDD.glom().map(arrayToTD(compression)).reduce(reduceTD)
val testX = 100.0
println("Running TreeDigest in spark using Kryo")
println(s"Took ${(currentTimeMillis() - startTime) / 1000d} seconds.")
println(s"From spark : ${TDspark.cdf(testX)}")
println(s"From spark glommed : ${TDGlommedspark.cdf(testX)}")
println(s"From local : ${TDlocal.cdf(testX)}")
}
def reduceTD(l: TreeDigest, r: TreeDigest): TreeDigest = {
l.add(r)
l
}
def toTD(compression: Double)(x: Double): TreeDigest = {
val TD = new TreeDigest(compression)
TD.add(x)
TD
}
def arrayToTD(compression: Double)(X: Array[Double]): TreeDigest = {
val TD = new TreeDigest(compression)
X.foreach{x => TD.add(x)}
TD
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment