Skip to content

Instantly share code, notes, and snippets.

@maropu
Last active October 19, 2020 06:08
Show Gist options
  • Save maropu/f553d32c323ee803d39e2f7fa0b5a8c3 to your computer and use it in GitHub Desktop.
Save maropu/f553d32c323ee803d39e2f7fa0b5a8c3 to your computer and use it in GitHub Desktop.
// export SPARK_HOME=<YOUR_SPARK_V3_0>
$ git clone https://github.com/maropu/spark-tpcds-datagen.git
$ cd spark-tpcds-datagen
$ ./bin/datagen --master=local[*] --conf spark.driver.memory=8g --scale-factor 10 --output-location /tmp/tpcds-sf-10
scala> :paste
import org.apache.spark.sql.catalyst.catalog.CatalogColumnStat
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.types.DataType
sql("SET spark.sql.cbo.enabled=true")
sql("SET spark.sql.cbo.planStats.enabled=true")
sql("SET spark.sql.statistics.histogram.enabled=true")
sql("SET spark.sql.statistics.histogram.numBins=12")
val tmpDir = "/tmp/tpcds-sf-10"
val tables = Seq("customer", "store_sales", "web_sales", "date_dim", "household_demographics", "call_center", "catalog_page", "catalog_returns", "catalog_sales", "customer_address", "customer_demographics", "income_band", "inventory", "item", "promotion", "reason", "ship_mode", "store", "store_returns", "time_dim", "warehouse", "web_page", "web_returns", "web_site")
tables.foreach { t =>
spark.read.parquet(s"$tmpDir/$t").write.saveAsTable(t)
spark.sql(s"ANALYZE TABLE $t COMPUTE STATISTICS FOR ALL COLUMNS")
}
def minMaxStat(v: Option[Any], dt: DataType): Option[String] = v match {
case Some(v) => Some(s""""${CatalogColumnStat.toExternalString(v, "", dt)}"""")
case _ => None
}
println(" // scalastyle:off line.size.limit")
println(" val sf1TableStats = Map(")
tables.foreach { t =>
val rel = spark.table(t).queryExecution.analyzed.children(0).asInstanceOf[LogicalRelation]
val relStat = rel.computeStats()
println(s""" "$t" -> CatalogStatistics(${relStat.sizeInBytes}L, ${relStat.rowCount.map(v => s"${v}L")}, Map(""")
val colStats = relStat.attributeStats.map { case (attr, colStat) =>
// This `hist` can lead to a compilation error: `Method too large: org/apache/spark/sql/TPCDSTableStats$.<init> ()V`
// val hist = colStat.histogram.map { h =>
// val bins = h.bins.map { b => s"HistogramBin(${b.lo}, ${b.hi}, ${b.ndv})" }.mkString(", ")
// s"Some(Histogram(${h.height}, Array($bins)))"
// }.getOrElse("None")
val hist = "None"
s""" "${attr.name}" -> CatalogColumnStat(${colStat.distinctCount.map(v => s"${v}L")}, ${minMaxStat(colStat.min, attr.dataType)}, ${minMaxStat(colStat.max, attr.dataType)}, ${colStat.nullCount}, ${colStat.avgLen}, ${colStat.maxLen}, $hist, CatalogColumnStat.VERSION)"""
}
println(colStats.mkString(",\n"))
println(" )),")
}
println(" )")
println(" // scalastyle:on line.size.limit")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment