Skip to content

Instantly share code, notes, and snippets.

@srnghn
Created October 20, 2016 23:16
Show Gist options
  • Save srnghn/4b10aa0c623a4b246509ffb2e8ac3bb2 to your computer and use it in GitHub Desktop.
Save srnghn/4b10aa0c623a4b246509ffb2e8ac3bb2 to your computer and use it in GitHub Desktop.
ANOVA Test for Spark 2.0 using PySpark. The function returns 5 values: degrees of freedom between (numerator), degrees of freedom within (denominator), F-value, eta squared and omega squared.
from pyspark.sql.functions import *
# Implementation of ANOVA function: calculates the degrees of freedom, F-value, eta squared and omega squared values.
# Expects that 'categoryData' with two columns, the first being the categorical independent variable and the second being the scale dependent variable
def getAnovaStats(categoryData) :
cat_val = categoryData.toDF("cat","value")
cat_val.createOrReplaceTempView("df")
newdf = spark.sql("select A.cat, A.value, cast((A.value * A.value) as double) as valueSq, ((A.value - B.avg) * (A.value - B.avg)) as diffSq from df A join (select cat, avg(value) as avg from df group by cat) B where A.cat = B.cat")
grouped = newdf.groupBy("cat")
sums = grouped.sum("value")
counts = grouped.count()
numCats = counts.count()
sumsq = grouped.sum("valueSq")
avgs = grouped.avg("value")
totN = counts.selectExpr("sum(count) as total").rdd.map(lambda x: x.total).collect()[0]
totSum = sums.selectExpr("sum(`sum(value)`) as totSum").rdd.map(lambda x: x.totSum).collect()[0]
totSumSq = sumsq.selectExpr("sum(`sum(valueSq)`) as totSumSq").rdd.map(lambda x: x.totSumSq).collect()[0]
totMean = totSum / totN
dft = totN - 1
dfb = numCats - 1
dfw = totN - numCats
joined = counts.selectExpr("cat as category", "count").join(sums, col("category") == sums.cat, 'inner')\
.drop(sums.cat)\
.join(sumsq, col("category") == sumsq.cat, 'inner')\
.drop(sumsq.cat)\
.join(avgs, col("category") == avgs.cat, 'inner')\
.drop(avgs.cat)
finaldf = joined.withColumn("totMean", lit(totMean))
ssb_tmp = finaldf.rdd.map(lambda x: (x[0], ((x[4] - x[5])*(x[4] - x[5]))*x[1]))
ssb = ssb_tmp.toDF().selectExpr("sum(_2) as total").rdd.map(lambda x: x.total).collect()[0]
ssw_tmp = grouped.sum("diffSq")
ssw = ssw_tmp.selectExpr("sum(`sum(diffSq)`) as total").rdd.map(lambda x: x.total).collect()[0]
sst = ssb + ssw
msb = ssb / dfb
msw = ssw / dfw
F = msb / msw
etaSq = ssb / sst
omegaSq = (ssb - ((numCats - 1) * msw))/(sst + msw)
return (dfb, dfw, F, etaSq, omegaSq)
@thentangler
Copy link

Which one is the p value?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment