Skip to content

Instantly share code, notes, and snippets.

@graham-thomson
Last active September 7, 2018 14:16
Show Gist options
  • Save graham-thomson/e5969da36db76f2c21c80b0494debc97 to your computer and use it in GitHub Desktop.
Save graham-thomson/e5969da36db76f2c21c80b0494debc97 to your computer and use it in GitHub Desktop.
import org.apache.spark.ml.linalg.{SparseVector, Vectors}
import org.apache.spark.ml.feature.StandardScaler
import org.apache.spark.sql.SparkSession
object censusAggregation {
val usage = """
Usage: censusAggregation pathToCensus outputPath
"""
def addArrays(a:Array[Double], b:Array[Double]) = a.zip(b).map { case (x, y) => x + y }
def totalPlatformActions(p:String, v:Array[Double]) = p match {
case "PC" => v ++ Array(v.sum, 0.0, 0.0)
case "Mobile" => v ++ Array(0.0, v.sum, 0.0)
case "Tablet" => v ++ Array(0.0, 0.0, v.sum)
}
def percent(v:Array[Double]) = v.map { x => x/v.sum }
def binary(v:Array[Double]) = v.map { x => if (x != 0.0) 1.0 else 0.0 }
def arrayToSparse(v:Array[Double]) = Vectors.dense(v).toSparse
def main(args: Array[String]): Unit = {
if (args.length == 0) {
println(usage)
sys.exit(1)
}
val filename = args(0).toString
val output = args(1).toString
val spark = SparkSession.builder().config("spark.log.level", "ERROR").getOrCreate()
import spark.sqlContext.implicits._
val census = spark.read.parquet(filename)
val reducedFeatures = census.rdd.map(x => (x(0).asInstanceOf[String], totalPlatformActions(x(4).asInstanceOf[String], x(5).asInstanceOf[SparseVector].toArray))).reduceByKey(addArrays)
val transformedFeatures = reducedFeatures.map(x => (x._1, arrayToSparse(x._2), arrayToSparse(percent(x._2)), arrayToSparse(binary(x._2)))).toDF("device_id", "features", "percent_features", "binary_features")
val scaler = new StandardScaler()
.setInputCol("features")
.setOutputCol("scaledFeatures")
.setWithStd(true)
.setWithMean(false)
reducedFeatures.map(x => (x._1, arrayToSparse(x._2))).toDF("device_id", "features").write.parquet(output)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment