Skip to content

Instantly share code, notes, and snippets.

@afsalthaj
Last active August 8, 2019 02:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save afsalthaj/5de5dac58343f8dd560277817d5152f0 to your computer and use it in GitHub Desktop.
Save afsalthaj/5de5dac58343f8dd560277817d5152f0 to your computer and use it in GitHub Desktop.
```scala
scala> list.collect
res0: Array[org.apache.spark.sql.Row] = Array([1.0], [1.0], [1.0], [2.0], [2.0], [0.0])
scala> list
res1: org.apache.spark.sql.DataFrame = [a_indexed: double]
scala> val table1 =list
table1: org.apache.spark.sql.DataFrame = [a_indexed: double]
scala> val table2 =list
table2: org.apache.spark.sql.DataFrame = [a_indexed: double]
scala> val table3 = list
table3: org.apache.spark.sql.DataFrame = [a_indexed: double]
scala> val table4 = list
table4: org.apache.spark.sql.DataFrame = [a_indexed: double]
// Sequential joins (optimiser not smart enough)
scala> table1.join(table2, "a_indexed").join(table3, "a_indexed").join(table4, "a_indexed").explain
== Physical Plan ==
*(7) Project [a_indexed#3]
+- *(7) SortMergeJoin [a_indexed#3], [a_indexed#33], Inner
:- *(5) Project [a_indexed#3]
: +- *(5) SortMergeJoin [a_indexed#3], [a_indexed#31], Inner
: :- *(3) Project [a_indexed#3]
: : +- *(3) SortMergeJoin [a_indexed#3], [a_indexed#29], Inner
: : :- *(1) Sort [a_indexed#3 ASC NULLS FIRST], false, 0
: : : +- Exchange hashpartitioning(a_indexed#3, 200)
: : : +- LocalTableScan [a_indexed#3]
: : +- *(2) Sort [a_indexed#29 ASC NULLS FIRST], false, 0
: : +- ReusedExchange [a_indexed#29], Exchange hashpartitioning(a_indexed#3, 200)
: +- *(4) Sort [a_indexed#31 ASC NULLS FIRST], false, 0
: +- ReusedExchange [a_indexed#31], Exchange hashpartitioning(a_indexed#3, 200)
+- *(6) Sort [a_indexed#33 ASC NULLS FIRST], false, 0
+- ReusedExchange [a_indexed#33], Exchange hashpartitioning(a_indexed#3, 200)
// Write good queries, instead of relying on optimiser now and then.
scala> val join1 = table1.join(table2, "a_indexed")
join1: org.apache.spark.sql.DataFrame = [a_indexed: double]
scala> val join2 = table3.join(table4, "a_indexed")
join2: org.apache.spark.sql.DataFrame = [a_indexed: double]
scala> val join3 = join1.join(join2, "a_indexed")
join3: org.apache.spark.sql.DataFrame = [a_indexed: double]
scala> join3.explain
== Physical Plan ==
*(7) Project [a_indexed#3]
+- *(7) SortMergeJoin [a_indexed#3], [a_indexed#40], Inner
:- *(3) Project [a_indexed#3]
: +- *(3) SortMergeJoin [a_indexed#3], [a_indexed#36], Inner
: :- *(1) Sort [a_indexed#3 ASC NULLS FIRST], false, 0
: : +- Exchange hashpartitioning(a_indexed#3, 200)
: : +- LocalTableScan [a_indexed#3]
: +- *(2) Sort [a_indexed#36 ASC NULLS FIRST], false, 0
: +- ReusedExchange [a_indexed#36], Exchange hashpartitioning(a_indexed#3, 200)
+- *(6) Project [a_indexed#40]
+- *(6) SortMergeJoin [a_indexed#40], [a_indexed#38], Inner
:- *(4) Sort [a_indexed#40 ASC NULLS FIRST], false, 0
: +- ReusedExchange [a_indexed#40], Exchange hashpartitioning(a_indexed#3, 200)
+- *(5) Sort [a_indexed#38 ASC NULLS FIRST], false, 0
+- ReusedExchange [a_indexed#38], Exchange hashpartitioning(a_indexed#3, 200)
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment