Skip to content

Instantly share code, notes, and snippets.

@randerzander
Created January 28, 2017 19:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save randerzander/ad2cb1615092f53db27f8595ec5894a5 to your computer and use it in GitHub Desktop.
Save randerzander/ad2cb1615092f53db27f8595ec5894a5 to your computer and use it in GitHub Desktop.
import spark.implicits._
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
val schemaString = sc.textFile("/data.csv").take(1)(0)
val rdd = sc.textFile("/data.csv").filter(line => line != schemaString)
val fields = (schemaString.split(",").slice(0, 9) ++ Array("stat", "value"))
.map(fieldName => {
if (fieldName contains "value")
StructField(fieldName, DoubleType, nullable = true)
else
StructField(fieldName, StringType, nullable = true)
})
val schema = StructType(fields)
val rowRDD = rdd.flatMap(line => {
val fields = line.split(",")
val keyFields = fields.slice(0,9)
fields.slice(10, fields.size).zipWithIndex.map(x => {
Row(keyFields ++ Array("stat_type1" + (x._2).toString, x._1.toDouble):_*)
})
})
val df = spark.createDataFrame(rowRDD, schema)
df.createOrReplaceTempView("base")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment