Skip to content

Instantly share code, notes, and snippets.

@umbertogriffo
Last active December 11, 2020 16:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save umbertogriffo/40a698244e468fdcb0e14a911bf028d3 to your computer and use it in GitHub Desktop.
Save umbertogriffo/40a698244e468fdcb0e14a911bf028d3 to your computer and use it in GitHub Desktop.
broadcast_join_medium_size
import org.apache.spark.sql.functions._
val mediumDf = Seq((0, "zero"), (4, "one")).toDF("id", "value")
val largeDf = Seq((0, "zero"), (2, "two"), (3, "three"), (4, "four"), (5, "five")).toDF("id", "value")
mediumDf.show()
largeDf.show()
/*
+---+-----+
| id|value|
+---+-----+
| 0| zero|
| 4| one|
+---+-----+
+---+-----+
| id|value|
+---+-----+
| 0| zero|
| 2| two|
| 3|three|
| 4| four|
| 5| five|
+---+-----+
*/
val keys = mediumDf.select("id").as[Int].collect().toSeq
print(keys)
/*
keys: Seq[Int] = WrappedArray(0, 4)
*/
val reducedDataFrame = largeDf.filter(col("id").isin(keys:_*))
reducedDataFrame.show()
/*
+---+-----+
| id|value|
+---+-----+
| 0| zero|
| 4| four|
+---+-----+
*/
val result = reducedDataFrame.join(mediumDf, Seq("id"))
result.explain()
result.show()
/*
== Physical Plan ==
*(1) Project [id#246, value#247, value#238]
+- *(1) BroadcastHashJoin [id#246], [id#237], Inner, BuildRight
:- LocalTableScan [id#246, value#247]
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint))), [id=#234]
+- LocalTableScan [id#237, value#238]
+---+-----+-----+
| id|value|value|
+---+-----+-----+
| 0| zero| zero|
| 4| four| one|
+---+-----+-----+
*/
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment