Skip to content

Instantly share code, notes, and snippets.

@zhouyuan
Created June 21, 2023 09:49
Show Gist options
  • Save zhouyuan/1f53d3532fd98c11c4d3ada889d5cec8 to your computer and use it in GitHub Desktop.
Save zhouyuan/1f53d3532fd98c11c4d3ada889d5cec8 to your computer and use it in GitHub Desktop.
import org.apache.spark.sql.execution.debug._
import scala.io.Source
import java.io.File
import java.util.Arrays
def time[R](block: => R): R = {
val t0 = System.nanoTime()
val result = block // call-by-name
val t1 = System.nanoTime()
println("Elapsed time: " + (t1 - t0)/1000000 + "ms")
result
}
val call_center = spark.read.format("parquet").load("file:///mnt/nvme0/tpcds/parquet_t_tpcds_part_100/call_center")
val catalog_page = spark.read.format("parquet").load("file:///mnt/nvme0/tpcds/parquet_t_tpcds_part_100/catalog_page")
val catalog_returns = spark.read.format("parquet").load("file:///mnt/nvme0/tpcds/parquet_t_tpcds_part_100/catalog_returns")
val catalog_sales = spark.read.format("parquet").load("file:///mnt/nvme0/tpcds/parquet_t_tpcds_part_100/catalog_sales")
val customer = spark.read.format("parquet").load("file:///mnt/nvme0/tpcds/parquet_t_tpcds_part_100/customer")
val customer_address = spark.read.format("parquet").load("file:///mnt/nvme0/tpcds/parquet_t_tpcds_part_100/customer_address")
val customer_demographics = spark.read.format("parquet").load("file:///mnt/nvme0/tpcds/parquet_t_tpcds_part_100/customer_demographics")
val date_dim = spark.read.format("parquet").load("file:///mnt/nvme0/tpcds/parquet_t_tpcds_part_100/date_dim")
val household_demographics = spark.read.format("parquet").load("file:///mnt/nvme0/tpcds/parquet_t_tpcds_part_100/household_demographics")
val income_band = spark.read.format("parquet").load("file:///mnt/nvme0/tpcds/parquet_t_tpcds_part_100/income_band")
val inventory = spark.read.format("parquet").load("file:///mnt/nvme0/tpcds/parquet_t_tpcds_part_100/inventory")
val item = spark.read.format("parquet").load("file:///mnt/nvme0/tpcds/parquet_t_tpcds_part_100/item")
val promotion = spark.read.format("parquet").load("file:///mnt/nvme0/tpcds/parquet_t_tpcds_part_100/promotion")
val reason = spark.read.format("parquet").load("file:///mnt/nvme0/tpcds/parquet_t_tpcds_part_100/reason")
val ship_mode = spark.read.format("parquet").load("file:///mnt/nvme0/tpcds/parquet_t_tpcds_part_100/ship_mode")
val store = spark.read.format("parquet").load("file:///mnt/nvme0/tpcds/parquet_t_tpcds_part_100/store")
val store_returns = spark.read.format("parquet").load("file:///mnt/nvme0/tpcds/parquet_t_tpcds_part_100/store_returns")
val store_sales = spark.read.format("parquet").load("file:///mnt/nvme0/tpcds/parquet_t_tpcds_part_100/store_sales")
val time_dim = spark.read.format("parquet").load("file:///mnt/nvme0/tpcds/parquet_t_tpcds_part_100/time_dim")
val warehouse = spark.read.format("parquet").load("file:///mnt/nvme0/tpcds/parquet_t_tpcds_part_100/warehouse")
val web_page = spark.read.format("parquet").load("file:///mnt/nvme0/tpcds/parquet_t_tpcds_part_100/web_page")
val web_returns = spark.read.format("parquet").load("file:///mnt/nvme0/tpcds/parquet_t_tpcds_part_100/web_returns")
val web_sales = spark.read.format("parquet").load("file:///mnt/nvme0/tpcds/parquet_t_tpcds_part_100/web_sales")
val web_site = spark.read.format("parquet").load("file:///mnt/nvme0/tpcds/parquet_t_tpcds_part_100/web_site")
call_center.createOrReplaceTempView("call_center")
catalog_page.createOrReplaceTempView("catalog_page")
catalog_returns.createOrReplaceTempView("catalog_returns")
catalog_sales.createOrReplaceTempView("catalog_sales")
customer.createOrReplaceTempView("customer")
customer_address.createOrReplaceTempView("customer_address")
customer_demographics.createOrReplaceTempView("customer_demographics")
date_dim.createOrReplaceTempView("date_dim")
household_demographics.createOrReplaceTempView("household_demographics")
income_band.createOrReplaceTempView("income_band")
inventory.createOrReplaceTempView("inventory")
item.createOrReplaceTempView("item")
promotion.createOrReplaceTempView("promotion")
reason.createOrReplaceTempView("reason")
ship_mode.createOrReplaceTempView("ship_mode")
store.createOrReplaceTempView("store")
store_returns.createOrReplaceTempView("store_returns")
store_sales.createOrReplaceTempView("store_sales")
time_dim.createOrReplaceTempView("time_dim")
warehouse.createOrReplaceTempView("warehouse")
web_page.createOrReplaceTempView("web_page")
web_returns.createOrReplaceTempView("web_returns")
web_sales.createOrReplaceTempView("web_sales")
web_site.createOrReplaceTempView("web_site")
// Filter out some queries
val unsupported = List("ss_max.sql", "ss_maxb.sql").toSet
def getListOfFiles(dir: String):List[File] = {
val d = new File(dir)
if (d.exists && d.isDirectory) {
//d.listFiles.filter(_.isFile).filterNot{n => unsupported.contains(n.getName().split("/").takeRight(1){0})}.toList
d.listFiles.filter(_.isFile).filter(_.getName().contains("q1.sql")).toList
//d.listFiles.filter(_.isFile).toList
} else {
List[File]()
}
}
/*
def getListOfFiles(dir: String):List[File] = {
val d = new File(dir)
if (d.exists && d.isDirectory) {
d.listFiles.filter(_.isFile).toList
} else {
List[File]()
}
}*/
val fileLists = getListOfFiles("/home/sparkuser/tpcds/tpcds_2_4.orig/")
//val fileLists = getListOfFiles("/tmp")
val sorted = fileLists.sortBy {
f => f.getName match {
case name =>
var str = name
str = str.replaceFirst("a", ".1")
str = str.replaceFirst("b", ".2")
str = str.replaceFirst(".sql", "")
str = str.replaceFirst("q", "")
str.toDouble
}}
for (t <- sorted) {
println(t)
val fileContents = Source.fromFile(t).getLines.filter(!_.startsWith("--")).filter(!_.startsWith("LIMIT")).mkString(" ")
println(fileContents)
try {
time{spark.sql(fileContents).show}
//spark.sql(fileContents).debugCodegen
Thread.sleep(5000)
} catch {
case e: Exception => None
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment