Skip to content

Instantly share code, notes, and snippets.

@debugger87
Last active July 6, 2017 11:06
Show Gist options
  • Star 3 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save debugger87/981ea021efefe3d0360e to your computer and use it in GitHub Desktop.
Save debugger87/981ea021efefe3d0360e to your computer and use it in GitHub Desktop.
Convert Array[org.apache.spark.sql.Row] to Array[Map[String, Any]]
import org.apache.spark.sql.catalyst.expressions.Row
import org.apache.spark.sql.catalyst.types._
import scala.collection.mutable.{ArrayBuffer}
object SparkRowFormatter {
def formatRowsWithSchema(rowArr: Array[Row], schema: StructType) = {
rowArr.map(r => formatStruct(schema.fields, r))
}
private def formatStruct(schema: Seq[StructField], r: Row) = {
val paired = schema.zip(r)
paired.foldLeft(Map[String, Any]())((s, p) => s ++ formatItem(p))
}
private def formatItem(p: Pair[StructField, Any]): Map[String, Any] = {
p match {
case (sf, a) =>
sf.dataType match {
case ArrayType(et, _) =>
Map(sf.name -> (if (a == null) a else formatArray(et, a.asInstanceOf[ArrayBuffer[Any]])))
case StructType(s) =>
Map(sf.name -> (if (a == null) a else formatStruct(s, a.asInstanceOf[Row])))
case _ => Map(sf.name -> a)
}
}
}
private def formatArray(et: DataType, arr: ArrayBuffer[Any]): Seq[Any] = {
et match {
case StructType(s) => arr.map(e => formatStruct(s, e.asInstanceOf[Row]))
case ArrayType(t, _) =>
arr.map(e => formatArray(t, e.asInstanceOf[ArrayBuffer[Any]]))
case _ => arr
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment