Skip to content

Instantly share code, notes, and snippets.

@alexeykudinkin
Created June 9, 2022 20:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alexeykudinkin/ac931aaafec934b1d307cbd380c825cf to your computer and use it in GitHub Desktop.
Save alexeykudinkin/ac931aaafec934b1d307cbd380c825cf to your computer and use it in GitHub Desktop.
Querying previously ingested Amazon Reviews dataset
// NOTE: This relies on the dataset previously ingested in here:
// https://gist.github.com/alexeykudinkin/233ce2d365ae4a9833f557de7ed5d1b9
////////////////////////////////////////////////////////////////
// Preparing tables for Querying
////////////////////////////////////////////////////////////////
// Temp Table w/ Data Skipping DISABLED
val readDF: DataFrame = {
spark.read.format("hudi")
.option(HoodieMetadataConfig.ENABLE.key, "true")
.option("hoodie.enable.data.skipping", "false")
.load(outputPath)
}
val rawSnapshotTableName = safeTableName(s"${tableName}_sql_snapshot")
readDF.createOrReplaceTempView(rawSnapshotTableName)
// Temp Table w/ Data Skipping ENABLED
val readDFSkip: DataFrame = {
spark.read.format("hudi")
.option(HoodieMetadataConfig.ENABLE.key, "true")
.option("hoodie.enable.data.skipping", "true")
.load(outputPath)
}
val dataSkippingSnapshotTableName = safeTableName(s"${tableName}_sql_snapshot_skipping")
readDFSkip.createOrReplaceTempView(dataSkippingSnapshotTableName)
////////////////////////////////////////////////////////////////
// Querying
////////////////////////////////////////////////////////////////
def show(df: DataFrame) = {
df.explain(extended = true)
df.show()
}
def runQuery1a(tableName: String) = {
// Query 1A: Count number of reviews by customer_id for some customer
// NOTE: This query has _lower_ "specificity", since given customer_id overlaps with at least 10% of the ranges of individual parquet files in this table
show(spark.sql(s"SELECT review_id, customer_id FROM $tableName WHERE customer_id in ('10000464')"))
}
def runQuery1b(tableName: String) = {
// Query 1B: Count number of reviews by customer_id for specific customer
// NOTE: This query has _higher_ "specificity", since given customer_id overlaps with just ~1% of the ranges of individual parquet files in this table
show(spark.sql(s"SELECT review_id, customer_id FROM $tableName WHERE customer_id in ('10000004')"))
}
runQuery1a(rawSnapshotTableName)
runQuery1a(dataSkippingSnapshotTableName)
// …
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment