Skip to content

Instantly share code, notes, and snippets.

@rcmiii
Created June 15, 2021 13:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rcmiii/bbfa23efd6272f5c6c92b4d232f8853c to your computer and use it in GitHub Desktop.
Save rcmiii/bbfa23efd6272f5c6c92b4d232f8853c to your computer and use it in GitHub Desktop.
Typesafe Joins in Spark 2.4
import org.apache.spark.sql.{Dataset, Encoder, Encoders, SparkSession}
import scala.reflect.ClassTag
import scala.reflect.runtime.universe.TypeTag
implicit class TypedDataset[T: TypeTag: Encoder](ds: Dataset[T]) {
val spark: SparkSession = ds.sparkSession
import spark.implicits._
// left anti-join excludes values from lefthand dataset matching key in righthand dataset
def typedLeftAntiJoin[R: TypeTag, K: TypeTag: Encoder](
right: Dataset[R],
leftKeyFunction: T => K,
rightKeyFunction: R => K
): Dataset[T] = {
val keyedLeft = ds.map[(K, T)]((d: T) => leftKeyFunction(d) -> d)
val keyedRight = right.map((d: R) => rightKeyFunction(d) -> d)
keyedLeft
.joinWith(keyedRight, keyedLeft("_1") === keyedRight("_1"), "left_outer")
.flatMap(
d =>
Option(d._2) match {
case None => Some(d._1._2)
case Some(_) => None
}
)
}
def typedFullOuterJoin[R <: Any: TypeTag, K <: Any: TypeTag](
right: Dataset[R],
leftKeyFunction: T => K,
rightKeyFunction: R => K
): Dataset[(Option[T], Option[R])] = {
val keyedLeft = ds.map[(K, T)]((d: T) => leftKeyFunction(d) -> d)
val keyedRight = right.map((d: R) => rightKeyFunction(d) -> d)
keyedLeft
.joinWith(keyedRight, keyedLeft("_1") === keyedRight("_1"), "full_outer")
.map(d => Option(d._1).map(_._2) -> Option(d._2).map(_._2))
}
def typedJoin[R <: Any: TypeTag, K <: Any: TypeTag](
right: Dataset[R],
leftKeyFunction: T => K,
rightKeyFunction: R => K
): Dataset[(T, R)] = {
val keyedLeft = ds.map[(K, T)]((d: T) => leftKeyFunction(d) -> d)
val keyedRight = right.map((d: R) => rightKeyFunction(d) -> d)
keyedLeft
.joinWith(keyedRight, keyedLeft("_1") === keyedRight("_1"))
.map(d => d._1._2 -> d._2._2)
}
def typedLeftOuterJoin[R <: Any: TypeTag, K <: Any: TypeTag](
right: Dataset[R],
leftKeyFunction: T => K,
rightKeyFunction: R => K
): Dataset[(T, Option[R])] = {
val keyedLeft = ds.map[(K, T)]((d: T) => leftKeyFunction(d) -> d)
val keyedRight = right.map((d: R) => rightKeyFunction(d) -> d)
keyedLeft
.joinWith(keyedRight, keyedLeft("_1") === keyedRight("_1"), "left_outer")
.map(d => d._1._2 -> Option(d._2).map(_._2))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment