Skip to content

Instantly share code, notes, and snippets.

Last active March 26, 2019 16:09
Show Gist options
  • Save RobColeman/7a5ebcb7c155c94b0a62 to your computer and use it in GitHub Desktop.
Save RobColeman/7a5ebcb7c155c94b0a62 to your computer and use it in GitHub Desktop.
Helpers for TDunnings Java TDigest library
package com.preact.platform.math.models
import java.lang.System._
import java.nio.ByteBuffer
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkContext, SparkConf}
import org.apache.commons.math3.distribution.ExponentialDistribution
import org.apache.commons.math3.distribution.NormalDistribution
import com.tdunning.math.stats.TreeDigest
import scala.collection.immutable.IndexedSeq
object TreeDigestHelper {
def apply(compression: Double): TreeDigest = new TreeDigest(compression)
def serialize(tdigest: TreeDigest): Array[Byte] = {
val arr = new Array[Byte](tdigest.byteSize)
def deserialize(arr: Array[Byte]): TreeDigest = {
def reduceMergeDigests(lBytes: Array[Byte], rBytes: Array[Byte]): Array[Byte] = {
val lTD = this.deserialize(lBytes)
val rTD = this.deserialize(rBytes)
def mapToDigest(compression: Double)(x: Double): Array[Byte] = {
val TD = new TreeDigest(compression)
TD.add(x, 1)
def mapArrayToDigest(compression: Double)(X: Seq[Double]): Array[Byte] = {
val TD = new TreeDigest(compression)
X.foreach{ x => TD.add(x, 1) }
def genCDF(supportMin: Double, supportMax: Double, points: Int = 100)(digest: TreeDigest): Seq[(Double,Double)] = {
val stepSize = (supportMax - supportMin) / points.toFloat
val support = (supportMin to supportMax by stepSize).toSeq { x => digest.cdf(x) })
def genPDF(supportMin: Double, supportMax: Double, points: Int = 100)(digest: TreeDigest): Seq[(Double,Double)] = {
val cdf = this.genCDF(supportMin, supportMax, points)(digest)
val pdf: Seq[(Double, Double)] = cdf.head +: cdf.sliding(2).map{ s =>
val (l,r) = (s.head,s.last)
val x = r._1
val pdf = r._2 - l._2
(x, pdf)
object TDigestExample {
def main(arg: Array[String]): Unit = {
val trueDist0: ExponentialDistribution = new ExponentialDistribution(15)
val trueDist1: ExponentialDistribution = new ExponentialDistribution(30)
val TD0: TreeDigest = new TreeDigest(25.0)
(0 until 10000).map{ i => trueDist0.sample()}.foreach{ x => TD0.add(x,1) }
val b: Array[Byte] = TreeDigestHelper.serialize(TD0)
val TD00: TreeDigest = TreeDigestHelper.deserialize(b)
object TDigestSparkExample {
def main(arg: Array[String]): Unit = {
val appName: String = "TDigest-Test"
val conf: SparkConf = new SparkConf().setAppName(appName).setMaster("local[16]")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sc: SparkContext = new SparkContext(conf)
val trueDist0: ExponentialDistribution = new ExponentialDistribution(15)
val compression = 25.0
val data = (0 until 100000).map{ i => trueDist0.sample()}
val TDlocal = new TreeDigest(compression)
data.foreach{x => TDlocal.add(x, 1)}
val startTime = currentTimeMillis()
val dataRDD: RDD[Double] = sc.parallelize(data)
val tdBytes: Array[Byte] =
val TDspark = TreeDigestHelper.deserialize(tdBytes)
println(s"Took ${(currentTimeMillis() - startTime) / 1000d} seconds.")
println(s"From spark : ${TDspark.cdf(15.0)}")
println(s"From local : ${TDlocal.cdf(15.0)}")
object TDigestSparkKryoExample {
def main(arg: Array[String]): Unit = {
val appName: String = "TDigest-Test"
val conf: SparkConf = new SparkConf().setAppName(appName).setMaster("local[16]")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sc: SparkContext = new SparkContext(conf)
val trueDist0: NormalDistribution = new NormalDistribution(100.0, 15 )
val trueDist2: NormalDistribution = new NormalDistribution(100.0, 15 )
val compression = 300.0
val data: IndexedSeq[Double] = (0 until 100000).map{ i => trueDist0.sample()}
val TDlocal = new TreeDigest(compression)
data.foreach{x => TDlocal.add(x, 1)}
val startTime = currentTimeMillis()
val dataRDD: RDD[Double] = sc.parallelize(data)
val TDspark =
val TDGlommedspark = dataRDD.glom().map(arrayToTD(compression)).reduce(reduceTD)
val testX = 100.0
println("Running TreeDigest in spark using Kryo")
println(s"Took ${(currentTimeMillis() - startTime) / 1000d} seconds.")
println(s"From spark : ${TDspark.cdf(testX)}")
println(s"From spark glommed : ${TDGlommedspark.cdf(testX)}")
println(s"From local : ${TDlocal.cdf(testX)}")
def reduceTD(l: TreeDigest, r: TreeDigest): TreeDigest = {
def toTD(compression: Double)(x: Double): TreeDigest = {
val TD = new TreeDigest(compression)
def arrayToTD(compression: Double)(X: Array[Double]): TreeDigest = {
val TD = new TreeDigest(compression)
X.foreach{x => TD.add(x)}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment