Skip to content

Instantly share code, notes, and snippets.

@dimastatz
Created May 4, 2020 17:43
Show Gist options
  • Save dimastatz/b990ace9064c6ec0bfd4d47bc2a12dea to your computer and use it in GitHub Desktop.
Save dimastatz/b990ace9064c6ec0bfd4d47bc2a12dea to your computer and use it in GitHub Desktop.
object DataFrameTools {
implicit class RichDataFrame(df: DataFrame) {
def saltedJoin(buildDf: DataFrame, joinExpression: Column, joinType: String, salt: Int): DataFrame = {
import org.apache.spark.sql.functions._
val tmpDf = buildDf.withColumn("slt_range", array(Range(0, salt).toList.map(lit): _*))
val tableDf = tmpDf.withColumn("slt_ratio_s", explode(tmpDf("slt_range"))).drop("slt_range")
val streamDf = df.withColumn("slt_ratio", monotonically_increasing_id % salt)
val saltedExpr = streamDf("slt_ratio") === tableDf("slt_ratio_s") && joinExpression
streamDf.join(tableDf, saltedExpr, joinType).drop("slt_ratio_s").drop("slt_ratio")
}
}
}
def runExample(): Unit = {
import DataFrameTools._
val df: DataFrame = ???
val dfTable: DataFrame = ???
df.saltedJoin(dfTable, df("id") === dfTable("id"), "left", 200)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment