val deviceModel: AnnotatedDataFrame[DeviceModel] =
val deviceMeasurement: AnnotatedDataFrame[DeviceMeasurement] =
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.functions.broadcast
import shapeless.ops.hlist.Prepend
import shapeless.{::, HList, HNil}
object flow {
type JoinList = HList
case class AnnotatedDataFrame[D, J <: JoinList](toDF: DataFrame) extends Serializable
import shapeless.ops.hlist.Selector
def mustHaveDeviceModel[J <: JoinList](
input: AnnotatedDataFrame[DeviceMeasurement, J]
)(implicit S: Selector[J, DeviceModel]) = ???
val country: AnnotatedDataFrame[Country, HNil] =
val joined: AnnotatedDataFrame[DeviceMeasurement, Country :: DeviceModel :: HNil] =
import shapeless.{ ::, HNil }
import implicits._
val deviceModel: AnnotatedDataFrame[DeviceModel, HNil] =
val deviceMeasurement: AnnotatedDataFrame[DeviceMeasurement, HNil] =
val joined: AnnotatedDataFrame[DeviceMeasurement, DeviceModel :: HNil] =
object DeviceModel {
// DataSource definition.
// ...
implicit def deviceModelToMeasurementJoin[LJ <: JoinList, RJ <: JoinList] =
Join.usingColumns[DeviceMeasurement, LJ, DeviceModel, RJ](Seq("device_model_id"))
import shapeless.::
import shapeless.ops.hlist.Prepend
object implicits {
implicit class AnnotatedDataFrameSyntax[L, LJ <: JoinList](left: AnnotatedDataFrame[L, LJ]) {
def join[R, RJ <: JoinList](
right: AnnotatedDataFrame[R, RJ]
)(implicit J: Join[L, LJ, R, RJ], P: Prepend[J, RJ]): AnnotatedDataFrame[L, R :: P.Out] =
J.join(left, right)
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.broadcast
import shapeless.::
import shapeless.ops.hlist.Prepend
sealed trait Join[L, LJ <: JoinList, R, RJ <: JoinList] {
def join(
left: AnnotatedDataFrame[L, LJ],
right: AnnotatedDataFrame[R, RJ]
)(implicit P: Prepend[LJ, RJ]): AnnotatedDataFrame[L, R :: P.Out]
import shapeless.::
import shapeless.ops.hlist.Prepend
sealed trait Join[L, LJ <: JoinList, R, RJ <: JoinList] {
def join(
left: AnnotatedDataFrame[L, LJ],
right: AnnotatedDataFrame[R, RJ]
)(implicit P: Prepend[LJ, RJ]): AnnotatedDataFrame[L, R :: P.Out]