Skip to content

Instantly share code, notes, and snippets.

@izeigerman
Last active January 30, 2021 22:22
Show Gist options
  • Save izeigerman/636f7cfbad36556c47b249c4e3d0bdd9 to your computer and use it in GitHub Desktop.
Save izeigerman/636f7cfbad36556c47b249c4e3d0bdd9 to your computer and use it in GitHub Desktop.
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.broadcast
import shapeless.::
import shapeless.ops.hlist.Prepend
sealed trait Join[L, LJ <: JoinList, R, RJ <: JoinList] {
def join(
left: AnnotatedDataFrame[L, LJ],
right: AnnotatedDataFrame[R, RJ]
)(implicit P: Prepend[LJ, RJ]): AnnotatedDataFrame[L, R :: P.Out]
}
object Join {
def usingColumns[L, LJ <: JoinList, R, RJ <: JoinList](
joinKeys: Seq[String],
joinType: JoinType = Inner,
isBroadcast: Boolean = false
): Join[L, LJ, R, RJ] = new Join[L, LJ, R, RJ] {
override def join(
left: AnnotatedDataFrame[L, LJ],
right: AnnotatedDataFrame[R, RJ]
)(implicit P: Prepend[LJ, RJ]): AnnotatedDataFrame[L, R :: P.Out] =
AnnotatedDataFrame(
left.toDF.join(
if (isBroadcast) broadcast(right.toDF) else right.toDF,
joinKeys,
joinType.sparkName
)
)
}
sealed abstract class JoinType(val sparkName: String)
case object Inner extends JoinType("inner")
case object LeftOuter extends JoinType("left_outer")
case object FullOuter extends JoinType("full_outer")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment