Skip to content

Instantly share code, notes, and snippets.

@srnghn
Created October 5, 2016 00:22
Show Gist options
  • Save srnghn/24e35ce23f504bcb2e8d5c0c060ad6d7 to your computer and use it in GitHub Desktop.
Save srnghn/24e35ce23f504bcb2e8d5c0c060ad6d7 to your computer and use it in GitHub Desktop.
Pearson's R Correlation for Spark 2.0. Created after getting inconsistant results with Statistics.corr. The two scale columns to be evaluated are to be selected from a DataFrame, converted to class type Dataset[ScaleTuple] (defined in this code) and passed to the correlation function.
// Create a class, ScaleTuple, to pass to the Pearson's R function so that columns can be referred to by specific names.
final case class ScaleTuple(var1: Double, var2: Double)
// Column names to use when converting to ScaleTuple
val colnames = Seq("var1", "var2")
/**
* Implementation of Pearson's R function: calculates r, the measurement of linear dependence between two variables
* Utilizes DataSet's 'agg' function
**/
def getPearsonsR(scaleData: org.apache.spark.sql.Dataset[ScaleTuple]) = {
scaleData.createOrReplaceTempView("scaleD")
val avg = spark.sql("select avg(var1) as Avg1, avg(var2) as Avg2 from scaleD")
avg.createOrReplaceTempView("avg")
val dif = spark.sql("select A.var1, A.var2, (A.var1 - B.Avg1) as Diff1, (A.var2 - B.Avg2) as Diff2 from scaleD A join avg B")
dif.createOrReplaceTempView("dif")
val mult = spark.sql("select (Diff1 * Diff2) as Multiple, (Diff1 * Diff1) as Diff1Sq, (Diff2 * Diff2) as Diff2Sq from dif")
val totMult = mult.agg(org.apache.spark.sql.functions.sum("Multiple")).
first.get(0) match {case d: Double => d case l: Long => l.toDouble}
val totSqRt1 = mult.agg(sqrt(org.apache.spark.sql.functions.sum("Diff1Sq"))).
first.get(0) match {case d: Double => d case l: Long => l.toDouble}
val totSqRt2 = mult.agg(sqrt(org.apache.spark.sql.functions.sum("Diff2Sq"))).
first.get(0) match {case d: Double => d case l: Long => l.toDouble}
val r = totMult / (totSqRt1 * totSqRt2)
r
}
// Example of how to convert to ScaleTuple and call the Pearson's R function
val scaleTuple = spark.sql("select height, weight from metrics").toDF(colnames: _*).as[ScaleTuple]
val r = getPearsonsR(scaleTuple)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment