Skip to content

Instantly share code, notes, and snippets.

@belablotski
Last active December 14, 2015 19:45
Show Gist options
  • Save belablotski/453e9b49e250beea7c14 to your computer and use it in GitHub Desktop.
Save belablotski/453e9b49e250beea7c14 to your computer and use it in GitHub Desktop.
Spark DataFrame tabular representation
/**
* Tabular representation of Spark dataset.
* Idea and initial implementation is from http://stackoverflow.com/questions/7539831/scala-draw-table-to-console.
* Usage:
* 1. Import source to spark-shell:
* set HADOOP_HOME=D:\Java\extra_conf
* cd D:\Java\spark-1.4.1-bin-hadoop2.6\bin
* spark-shell.cmd --master local[2] --packages com.databricks:spark-csv_2.10:1.3.0 -i /path/to/AvbTabulator.scala
* 2. Tabulator usage:
* import org.apache.spark.sql.hive.HiveContext
* val hiveContext = new HiveContext(sc)
* val stat = hiveContext.read.format("com.databricks.spark.csv").option("header", "true").option("inferSchema", "true").option("delimiter", "\t").load("D:\\data\\stats-belablotski.tsv")
* stat.registerTempTable("stat")
* AvbTabulator(hiveContext.sql("SELECT * FROM stat").take(20))
* AvbTabulator(hiveContext.sql("SELECT * FROM stat"))
* @author Aliaksei Belablotski
* @author Duncan McGregor
*
* Testing:
* With "Auto-Mpg" dataset (see LoadAutoMpgDataset.scala):
* statAutoMpg.registerTempTable("stat_auto_mpg")
* AvbTabulator(sqlContext.sql("SELECT cylinders, `car name`, `model year`, weight, horsepower FROM stat_auto_mpg WHERE cylinders in (3,5) ORDER BY cylinders"), 10, false)
*/
object AvbTabulator {
def format(table: Seq[Seq[Any]], isHeaderNeeded: Boolean) : String = table match {
case Seq() => ""
case _ =>
val sizes = for (row <- table) yield (for (cell <- row) yield if (cell == null) 0 else cell.toString.length)
val colSizes = for (col <- sizes.transpose) yield col.max
val rows = for (row <- table) yield formatRow(row, colSizes)
formatRows(rowSeparator(colSizes), rows, isHeaderNeeded)
}
def formatRes(table: Array[org.apache.spark.sql.Row]): String = {
val res: Seq[Seq[Any]] = (for { r <- table } yield r.toSeq).toSeq
format(res, false)
}
def formatDf(df: org.apache.spark.sql.DataFrame, n: Int = 20, isHeaderNeeded: Boolean = true): String = {
val res: Seq[Seq[Any]] = (for { r <- df.take(n) } yield r.toSeq).toSeq
format(List(df.schema.map(_.name).toSeq) ++ res, isHeaderNeeded)
}
def apply(table: Array[org.apache.spark.sql.Row]): Unit =
println(formatRes(table))
/**
* Print DataFrame in a formatted manner.
* @param df Data frame
* @param n How many row to take for tabular printing
*/
def apply(df: org.apache.spark.sql.DataFrame, n: Int = 20, isHeaderNeeded: Boolean = true): Unit =
println(formatDf(df, n, isHeaderNeeded))
def formatRows(rowSeparator: String, rows: Seq[String], isHeaderNeeded: Boolean): String = (
{ if (isHeaderNeeded) rowSeparator + "\n" + rows.head + "\n" + rowSeparator else rowSeparator } ::
rows.tail.toList :::
rowSeparator ::
List()).mkString("\n")
def formatRow(row: Seq[Any], colSizes: Seq[Int]) = {
val cells = (for ((item, size) <- row.zip(colSizes)) yield if (size == 0) "" else ("%" + size + "s").format(item))
cells.mkString("|", "|", "|")
}
def rowSeparator(colSizes: Seq[Int]) = colSizes map { "-" * _ } mkString("+", "+", "+")
/*
def main(args: Array[String]): Unit = {
println(format(List(List("head1", "head2", "head3"), List("one", "two", "three"), List("four", "five", "six"))))
}
*/
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment