Skip to content

Instantly share code, notes, and snippets.

@vinothchandar
Last active July 20, 2020 03:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vinothchandar/5544a92e616094c049f58c152faf0a53 to your computer and use it in GitHub Desktop.
Save vinothchandar/5544a92e616094c049f58c152faf0a53 to your computer and use it in GitHub Desktop.
Spark SQL Plans on Amazon Reviews Dataset

Dataset

https://s3.amazonaws.com/amazon-reviews-pds/readme.html

scala> spark.sql("select count(*) from amazon_reviews").show
+---------+
| count(1)|
+---------+
|160796570|
+---------+


scala> spark.sql("select count(*) from amazon_reviews where review_date > '2007' and review_date < '2009'").show
+--------+
|count(1)|
+--------+
| 4683044|
+--------+


scala>

Query with no filter

select sum(total_votes), product_category from amazon_reviews group by product_category 

takes 15 seconds.

== Parsed Logical Plan ==
GlobalLimit 1001
+- LocalLimit 1001
   +- Aggregate [product_category#332], [sum(cast(total_votes#325 as bigint)) AS sum(total_votes)#434L, product_category#332]
      +- SubqueryAlias `amazon_reviews`
         +- Relation[marketplace#317,customer_id#318,review_id#319,product_id#320,product_parent#321,product_title#322,star_rating#323,helpful_votes#324,total_votes#325,vine#326,verified_purchase#327,review_headline#328,review_body#329,review_date#330,year#331,product_category#332] parquet

== Analyzed Logical Plan ==
sum(total_votes): bigint, product_category: string
GlobalLimit 1001
+- LocalLimit 1001
   +- Aggregate [product_category#332], [sum(cast(total_votes#325 as bigint)) AS sum(total_votes)#434L, product_category#332]
      +- SubqueryAlias `amazon_reviews`
         +- Relation[marketplace#317,customer_id#318,review_id#319,product_id#320,product_parent#321,product_title#322,star_rating#323,helpful_votes#324,total_votes#325,vine#326,verified_purchase#327,review_headline#328,review_body#329,review_date#330,year#331,product_category#332] parquet

== Optimized Logical Plan ==
GlobalLimit 1001
+- LocalLimit 1001
   +- Aggregate [product_category#332], [sum(cast(total_votes#325 as bigint)) AS sum(total_votes)#434L, product_category#332]
      +- Project [total_votes#325, product_category#332]
         +- Relation[marketplace#317,customer_id#318,review_id#319,product_id#320,product_parent#321,product_title#322,star_rating#323,helpful_votes#324,total_votes#325,vine#326,verified_purchase#327,review_headline#328,review_body#329,review_date#330,year#331,product_category#332] parquet

== Physical Plan ==
CollectLimit 1001
+- *(2) HashAggregate(keys=[product_category#332], functions=[sum(cast(total_votes#325 as bigint))], output=[sum(total_votes)#434L, product_category#332])
   +- Exchange hashpartitioning(product_category#332, 200)
      +- *(1) HashAggregate(keys=[product_category#332], functions=[partial_sum(cast(total_votes#325 as bigint))], output=[product_category#332, sum#440L])
         +- *(1) FileScan parquet [total_votes#325,product_category#332] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/Volumes/HUDIDATA/input-data/amazon-reviews], PartitionCount: 43, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<total_votes:int>

+- Aggregate [product_category#332], [sum(cast(total_votes#325 as bigint)) AS sum(total_votes)#434L, product_category#332]

@vinothchandar
Copy link
Author

vinothchandar commented Jul 18, 2020

select sum(total_votes), product_category from amazon_reviews where review_date > '2007' and review_date < '2009' group by product_category 

takes 17 seconds

image

image

@vinothchandar
Copy link
Author

vinothchandar commented Jul 18, 2020

Next, lets sort by review_date and ask the same question.. (no partitions)

takes 10 seconds.

val outputPath = "file:///Volumes/HUDIDATA/input-data/amazon-reviews-sorted-by-date"
df.sort("review_date").write.mode("overwrite").parquet(outputPath)
val df2 = spark.read.parquet(outputPath)
df2.registerTempTable("amazon_reviews_sorted_by_date")

select sum(total_votes), product_category from amazon_reviews_sorted_by_date where review_date > '2007' and review_date < '2009' group by product_category 

image

image

@vinothchandar
Copy link
Author

vinothchandar commented Jul 18, 2020

Next, lets sort by product_category, review_date and ask the same question.. (no partitions) . Let's see if the pruning efficiency is there, even though we are primary sorting by product_category. This should pack review_dates close to each other.

val outputPath = "file:///Volumes/HUDIDATA/input-data/amazon-reviews-sorted-by-category-date"
df.sort("product_category", "review_date").write.mode("overwrite").parquet(outputPath)
val df2 = spark.read.parquet(outputPath)
df2.registerTempTable("amazon_reviews_sorted_by_category_date")

select sum(total_votes), product_category from amazon_reviews_sorted_by_category_date where review_date > '2007' and review_date < '2009' group by product_category 

image

image

@vinothchandar
Copy link
Author

Conclusion : We already layout data by arrival order. i.e ts (time) and it's saving a lot of compute for temporal queries already.

Further clustering by a secondary field, like product_category, city_id will yield good gains.

@FelixKJose
Copy link

@vinothchandar great. When you say clustering by secondary fields - does that mean sort(ts, product_category, city_id)? How about Z-Ordering, does HUDI plans to have anything like Z-Ordering?

@vinothchandar
Copy link
Author

Clustering is similar to Z-Ordering. You can see the RFC here.

https://cwiki.apache.org/confluence/display/HUDI/RFC+-+19+Clustering+data+for+speed+and+query+performance

Please keep discussion on the mailing list, since everyone else can chime in as well!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment