Skip to content

Instantly share code, notes, and snippets.

Created October 10, 2015 04:38
Show Gist options
  • Save InvisibleTech/cc6eac19d7871b71ec27 to your computer and use it in GitHub Desktop.
Save InvisibleTech/cc6eac19d7871b71ec27 to your computer and use it in GitHub Desktop.
An example of pivoting a Spark RDD based on an answer to a question asking how to do this which is posted on Stack Overflow. The answer was written in Python, so I basically translated it to Scala.
This Pivot sample is based on the 5th answer given on:
The answer above was written in Python, which I don't know very well. In addition, my Spark-Fu
is still somewhat introductory in some areas. To help with other aspects of translating the Python
sample I used these references:
import scala.collection._
import org.apache.spark.sql.Row;
import org.apache.spark.sql.types.{StructType,StructField,StringType, IntegerType};
val rdd = sc .parallelize(List(("X01",41,"US",3),
val krows = => Row.fromSeq(t.productIterator.toList)).keyBy(_(0).toString)
def seqPivot(m: mutable.Map[String, Any], r: Row): mutable.Map[String, Any] = {
m += (r(2).toString -> r(3))
m += ("Age" -> r(1))
def cmbPivot(m1:mutable.Map[String, Any], m2:mutable.Map[String, Any]): mutable.Map[String, Any] = {
m1 ++= m2
val pivoted = krows.aggregateByKey(mutable.Map.empty[String, Any])(seqPivot, cmbPivot)
val orderedColnames = => v.keys.toSet).reduce((s, t) => s.union(t)).toSeq.sortWith(_ < _)
val schema = StructType(List(StructField("ID", StringType, true)) ++ (for (c <- orderedColnames) yield StructField(c, IntegerType, true)))
val keyedRows = => List(t._1) ++ (for (c <- orderedColnames) yield t._2.getOrElse(c, null))).map(row => Row.fromSeq(row))
val result = sqlContext.createDataFrame(keyedRows, schema)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment