Created
June 9, 2022 20:39
-
-
Save alexeykudinkin/ac931aaafec934b1d307cbd380c825cf to your computer and use it in GitHub Desktop.
Querying previously ingested Amazon Reviews dataset
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// NOTE: This relies on the dataset previously ingested in here: | |
// https://gist.github.com/alexeykudinkin/233ce2d365ae4a9833f557de7ed5d1b9 | |
//////////////////////////////////////////////////////////////// | |
// Preparing tables for Querying | |
//////////////////////////////////////////////////////////////// | |
// Temp Table w/ Data Skipping DISABLED | |
val readDF: DataFrame = { | |
spark.read.format("hudi") | |
.option(HoodieMetadataConfig.ENABLE.key, "true") | |
.option("hoodie.enable.data.skipping", "false") | |
.load(outputPath) | |
} | |
val rawSnapshotTableName = safeTableName(s"${tableName}_sql_snapshot") | |
readDF.createOrReplaceTempView(rawSnapshotTableName) | |
// Temp Table w/ Data Skipping ENABLED | |
val readDFSkip: DataFrame = { | |
spark.read.format("hudi") | |
.option(HoodieMetadataConfig.ENABLE.key, "true") | |
.option("hoodie.enable.data.skipping", "true") | |
.load(outputPath) | |
} | |
val dataSkippingSnapshotTableName = safeTableName(s"${tableName}_sql_snapshot_skipping") | |
readDFSkip.createOrReplaceTempView(dataSkippingSnapshotTableName) | |
//////////////////////////////////////////////////////////////// | |
// Querying | |
//////////////////////////////////////////////////////////////// | |
def show(df: DataFrame) = { | |
df.explain(extended = true) | |
df.show() | |
} | |
def runQuery1a(tableName: String) = { | |
// Query 1A: Count number of reviews by customer_id for some customer | |
// NOTE: This query has _lower_ "specificity", since given customer_id overlaps with at least 10% of the ranges of individual parquet files in this table | |
show(spark.sql(s"SELECT review_id, customer_id FROM $tableName WHERE customer_id in ('10000464')")) | |
} | |
def runQuery1b(tableName: String) = { | |
// Query 1B: Count number of reviews by customer_id for specific customer | |
// NOTE: This query has _higher_ "specificity", since given customer_id overlaps with just ~1% of the ranges of individual parquet files in this table | |
show(spark.sql(s"SELECT review_id, customer_id FROM $tableName WHERE customer_id in ('10000004')")) | |
} | |
runQuery1a(rawSnapshotTableName) | |
runQuery1a(dataSkippingSnapshotTableName) | |
// … |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment