Skip to content

Instantly share code, notes, and snippets.

View izeigerman's full-sized avatar

Iaroslav Zeigerman izeigerman

View GitHub Profile
import org.apache.spark.sql.SparkSession
import shapeless.HNil
trait DataSource[D, P] {
def read(parameters: P)(implicit spark: SparkSession): AnnotatedDataFrame[D, HNil]
}
import org.apache.spark.sql.DataFrame
case class AnnotatedDataFrame[D, J <: JoinList](toDF: DataFrame) extends Serializable
import shapeless.HList
type JoinList = HList
import shapeless.{ ::, HList, HNil }
case class Foo(bar: String)
val list: Int :: String :: Foo :: HNil = 1 :: "Hello" :: Foo("World") :: HNil
val enrichedDeviceMeasurement: AnnotatedDataFrame[DeviceMeasurement] =
Join[DeviceMeasurement, DeviceModel].join(deviceMeasurement, deviceModel)
trait Join[L, R] {
def join(
left: AnnotatedDataFrame[L],
right: AnnotatedDataFrame[R]
): AnnotatedDataFrame[L]
}
trait Join[L, R] {
def join(
left: AnnotatedDataFrame[L],
right: AnnotatedDataFrame[R]
): AnnotatedDataFrame[?]
}
sealed trait DeviceMeasurement
object DeviceMeasurement {
implicit val deviceMeasurementDataSource = new DataSource[DeviceMeasurement, Unit] {
override def read(parameters: Unit)(implicit spark: SparkSession) =
AnnotatedDataFrame[DeviceMeasurement](
spark.createDataFrame(Seq(
(0, 1.0),
(0, 2.0),
sealed trait DeviceModel
object DeviceModel {
implicit val deviceModelDataSource = new DataSource[DeviceModel, Unit] {
override def read(parameters: Unit)(implicit spark: SparkSession) =
AnnotatedDataFrame[DeviceModel](
spark
.createDataFrame(Seq(
(0, "model_0"),
import org.apache.spark.sql.SparkSession
trait DataSource[D, P] {
def read(parameters: P)(implicit spark: SparkSession): AnnotatedDataFrame[D]
}
object DataSource {
def apply[D]: Helper[D] = new Helper[D]
// Helper used to improve the type inference and make the reading API cleaner.