Skip to content

Instantly share code, notes, and snippets.

View izeigerman's full-sized avatar

Iaroslav Zeigerman izeigerman

View GitHub Profile

Tobiko Data, Inc.

INDIVIDUAL CONTRIBUTOR LICENSE AGREEMENT

Thank you for your interest in Tobiko Data, Inc. (the "Company"). To clarify the intellectual property license granted with Contributions from any person or entity, the Company must have on file a signed Contributor License Agreement ("CLA") from each Contributor, indicating agreement with the license terms below. This agreement is for your protection as a Contributor as well as the protection of the Company and its users. It does not change your rights to use your own Contributions for any other purpose.

Read this document carefully before signing and keep a copy for your records.

You accept and agree to the following terms and conditions for Your Contributions (present and future) that you submit to the Company. In return, the Company shall not use Your Contributions in a way that is inconsistent with its bylaws in effect at the time of the Contribution. Except for the license granted herein to the Company and recipients of software distri

val deviceModel: AnnotatedDataFrame[DeviceModel] =
DataSource[DeviceModel].read
val deviceMeasurement: AnnotatedDataFrame[DeviceMeasurement] =
DataSource[DeviceMeasurement].read
import org.apache.spark.sql.{Column, DataFrame, SparkSession}
import org.apache.spark.sql.functions.broadcast
import shapeless.ops.hlist.Prepend
import shapeless.{::, HList, HNil}
object flow {
type JoinList = HList
case class AnnotatedDataFrame[D, J <: JoinList](toDF: DataFrame) extends Serializable
import shapeless.ops.hlist.Selector
def mustHaveDeviceModel[J <: JoinList](
input: AnnotatedDataFrame[DeviceMeasurement, J]
)(implicit S: Selector[J, DeviceModel]) = ???
val country: AnnotatedDataFrame[Country, HNil] =
DataSource[Country].read
val joined: AnnotatedDataFrame[DeviceMeasurement, Country :: DeviceModel :: HNil] =
deviceMeasurement.join(deviceModel).join(country)
import shapeless.{ ::, HNil }
import implicits._
val deviceModel: AnnotatedDataFrame[DeviceModel, HNil] =
DataSource[DeviceModel].read
val deviceMeasurement: AnnotatedDataFrame[DeviceMeasurement, HNil] =
DataSource[DeviceMeasurement].read
val joined: AnnotatedDataFrame[DeviceMeasurement, DeviceModel :: HNil] =
deviceMeasurement.join(deviceModel)
object DeviceModel {
// DataSource definition.
// ...
implicit def deviceModelToMeasurementJoin[LJ <: JoinList, RJ <: JoinList] =
Join.usingColumns[DeviceMeasurement, LJ, DeviceModel, RJ](Seq("device_model_id"))
}
import shapeless.::
import shapeless.ops.hlist.Prepend
object implicits {
implicit class AnnotatedDataFrameSyntax[L, LJ <: JoinList](left: AnnotatedDataFrame[L, LJ]) {
def join[R, RJ <: JoinList](
right: AnnotatedDataFrame[R, RJ]
)(implicit J: Join[L, LJ, R, RJ], P: Prepend[J, RJ]): AnnotatedDataFrame[L, R :: P.Out] =
J.join(left, right)
}
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.broadcast
import shapeless.::
import shapeless.ops.hlist.Prepend
sealed trait Join[L, LJ <: JoinList, R, RJ <: JoinList] {
def join(
left: AnnotatedDataFrame[L, LJ],
right: AnnotatedDataFrame[R, RJ]
)(implicit P: Prepend[LJ, RJ]): AnnotatedDataFrame[L, R :: P.Out]
import shapeless.::
import shapeless.ops.hlist.Prepend
sealed trait Join[L, LJ <: JoinList, R, RJ <: JoinList] {
def join(
left: AnnotatedDataFrame[L, LJ],
right: AnnotatedDataFrame[R, RJ]
)(implicit P: Prepend[LJ, RJ]): AnnotatedDataFrame[L, R :: P.Out]
}