Skip to content

Instantly share code, notes, and snippets.

@graham-thomson
Created October 3, 2019 19:09
Show Gist options
  • Save graham-thomson/81ac5ee4ac9c3b18a37122fd2261b21d to your computer and use it in GitHub Desktop.
Save graham-thomson/81ac5ee4ac9c3b18a37122fd2261b21d to your computer and use it in GitHub Desktop.
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.expressions.UserDefinedFunction
object FeatureVectorQuantiles {
// Simple helper to convert vector to array<double>
val vecToSeq: UserDefinedFunction = udf((v: Vector) => v.toArray)
def calculateBuckets (n: Integer): Array[Double] = {
(0 until n + 1).map {x => (1.0/n) * x}.toArray
}
def Calculate (df: DataFrame, featureColName: String = "features",
numBuckets: Int = 2, relativeError: Double = 0.001): Array[Array[Double]] = {
val spark: SparkSession = SparkSession.builder().getOrCreate()
import spark.implicits._
val nFeatures = df.select(featureColName).first.getAs[Vector](0).size
val exprs = (0 until nFeatures).map(i => $"_tmp".getItem(i).alias(s"feature_$i"))
val unpackedDF = df.select(vecToSeq(col(featureColName)).alias("_tmp")).select(exprs:_*)
unpackedDF.stat.approxQuantile(unpackedDF.columns, calculateBuckets(numBuckets), relativeError)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment