Last active
August 8, 2019 02:30
-
-
Save afsalthaj/5de5dac58343f8dd560277817d5152f0 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
```scala | |
scala> list.collect | |
res0: Array[org.apache.spark.sql.Row] = Array([1.0], [1.0], [1.0], [2.0], [2.0], [0.0]) | |
scala> list | |
res1: org.apache.spark.sql.DataFrame = [a_indexed: double] | |
scala> val table1 =list | |
table1: org.apache.spark.sql.DataFrame = [a_indexed: double] | |
scala> val table2 =list | |
table2: org.apache.spark.sql.DataFrame = [a_indexed: double] | |
scala> val table3 = list | |
table3: org.apache.spark.sql.DataFrame = [a_indexed: double] | |
scala> val table4 = list | |
table4: org.apache.spark.sql.DataFrame = [a_indexed: double] | |
// Sequential joins (optimiser not smart enough) | |
scala> table1.join(table2, "a_indexed").join(table3, "a_indexed").join(table4, "a_indexed").explain | |
== Physical Plan == | |
*(7) Project [a_indexed#3] | |
+- *(7) SortMergeJoin [a_indexed#3], [a_indexed#33], Inner | |
:- *(5) Project [a_indexed#3] | |
: +- *(5) SortMergeJoin [a_indexed#3], [a_indexed#31], Inner | |
: :- *(3) Project [a_indexed#3] | |
: : +- *(3) SortMergeJoin [a_indexed#3], [a_indexed#29], Inner | |
: : :- *(1) Sort [a_indexed#3 ASC NULLS FIRST], false, 0 | |
: : : +- Exchange hashpartitioning(a_indexed#3, 200) | |
: : : +- LocalTableScan [a_indexed#3] | |
: : +- *(2) Sort [a_indexed#29 ASC NULLS FIRST], false, 0 | |
: : +- ReusedExchange [a_indexed#29], Exchange hashpartitioning(a_indexed#3, 200) | |
: +- *(4) Sort [a_indexed#31 ASC NULLS FIRST], false, 0 | |
: +- ReusedExchange [a_indexed#31], Exchange hashpartitioning(a_indexed#3, 200) | |
+- *(6) Sort [a_indexed#33 ASC NULLS FIRST], false, 0 | |
+- ReusedExchange [a_indexed#33], Exchange hashpartitioning(a_indexed#3, 200) | |
// Write good queries, instead of relying on optimiser now and then. | |
scala> val join1 = table1.join(table2, "a_indexed") | |
join1: org.apache.spark.sql.DataFrame = [a_indexed: double] | |
scala> val join2 = table3.join(table4, "a_indexed") | |
join2: org.apache.spark.sql.DataFrame = [a_indexed: double] | |
scala> val join3 = join1.join(join2, "a_indexed") | |
join3: org.apache.spark.sql.DataFrame = [a_indexed: double] | |
scala> join3.explain | |
== Physical Plan == | |
*(7) Project [a_indexed#3] | |
+- *(7) SortMergeJoin [a_indexed#3], [a_indexed#40], Inner | |
:- *(3) Project [a_indexed#3] | |
: +- *(3) SortMergeJoin [a_indexed#3], [a_indexed#36], Inner | |
: :- *(1) Sort [a_indexed#3 ASC NULLS FIRST], false, 0 | |
: : +- Exchange hashpartitioning(a_indexed#3, 200) | |
: : +- LocalTableScan [a_indexed#3] | |
: +- *(2) Sort [a_indexed#36 ASC NULLS FIRST], false, 0 | |
: +- ReusedExchange [a_indexed#36], Exchange hashpartitioning(a_indexed#3, 200) | |
+- *(6) Project [a_indexed#40] | |
+- *(6) SortMergeJoin [a_indexed#40], [a_indexed#38], Inner | |
:- *(4) Sort [a_indexed#40 ASC NULLS FIRST], false, 0 | |
: +- ReusedExchange [a_indexed#40], Exchange hashpartitioning(a_indexed#3, 200) | |
+- *(5) Sort [a_indexed#38 ASC NULLS FIRST], false, 0 | |
+- ReusedExchange [a_indexed#38], Exchange hashpartitioning(a_indexed#3, 200) | |
``` |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment