Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save InvisibleTech/c71cb88b2390eb2223a8 to your computer and use it in GitHub Desktop.
Save InvisibleTech/c71cb88b2390eb2223a8 to your computer and use it in GitHub Desktop.
Apache Spark: Loading CSV File from HDFS Into SQL Context And Transpose
// Assuming the file is loaded into a localhost HDFS node:
// hadoop fs -ls -R /
// drwxr-xr-x - xxxxxxxxxxxx supergroup 0 2015-02-02 22:26 /spark
// -rw-r--r-- 1 xxxxxxxxxxxx supergroup 78 2015-02-02 22:26 /spark/peopleall.txt
//
// All of this code is from
// http://www.infoobjects.com/spark-sql-schemardd-programmatically-specifying-schema/
// https://github.com/bbnsumanth/transposing
//
// with some tweakd by me to run on my files and HDFS Node.
//
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext._
import org.apache.spark.sql._
// Load the data and get a schema and temp table
val person = sc.textFile("hdfs://localhost:9000/spark/peopleall.txt")
val schema = StructType(Array(StructField("firstName",StringType,true),StructField("lastName",StringType,true),StructField("age",IntegerType,true)))
val rowRDD = person.map(_.split(",")).map(p => org.apache.spark.sql.Row(p(0),p(1),p(2).toInt))
val personSchemaRDD = sqlContext.applySchema(rowRDD, schema)
personSchemaRDD.registerTempTable("person")
// Spark SQL query
sql("select * from person").foreach(println)
// Now Transpose the data - based on the Git Hub repo: https://github.com/bbnsumanth/transposing
//
import org.apache.spark.sql.Row
val rows = sql("select * from person")
val transposed = rows.map(x => x.toArray).flatMap(x => x.zipWithIndex).map(x => x.swap).groupByKey.map(x => (x._1,x._2.toVector))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment