Skip to content

Instantly share code, notes, and snippets.

@dreadedsoftware
Last active May 5, 2017 13:43
Show Gist options
  • Save dreadedsoftware/2d8dbbd5829b7360354c5b98f8035d3d to your computer and use it in GitHub Desktop.
Save dreadedsoftware/2d8dbbd5829b7360354c5b98f8035d3d to your computer and use it in GitHub Desktop.
Takes an HList of Task[Dataset] and joins them on a column producing a Task[DataFrame]
object P extends Poly1
val dummy = "_dummy_"
def join[N <: Nat, L <: HList, AsDF <: HList](
in: L, joinOn: String
)(implicit //
length: Length.Aux[L, N],
notZero: N GT nat._0,
fill: Fill.Aux[N, Task[DataFrame], AsDF],
mapper: Mapper.Aux[P.type, L, AsDF],
toList: ToList[AsDF, Task[DataFrame]],
sqlCtx: SQLContext): Task[DataFrame] = {
import sqlCtx.implicits._
val hList = mapper(in)
val list = toList(hList)
val task = list.foldLeft(Task.now { List[DataFrame]() }) { (acc, next) =>
for {
acc <- acc
next <- next
} yield { next +: acc }
}
task.map {
_.reduce { (left, right) =>
val colLeft = left.columns.toSet
val colRight = right.columns.toSet
val intersection = colLeft intersect colRight - joinOn
val reducedRight = drop(right, intersection.map(right(_)).toSeq).distinct
val join = left.join(
reducedRight,
left(joinOn) === reducedRight(joinOn),
"outer"
)
val withColumn = join.withColumn(
dummy,
when(left(joinOn).isNull, reducedRight(joinOn))
.otherwise(left(joinOn))
)
val dropped = drop(withColumn, Seq(left(joinOn), reducedRight(joinOn)))
val result = dropped.withColumnRenamed(dummy, joinOn)
result.printSchema
notNull(result, joinOn).distinct
}
}
}
def notNull(df: DataFrame, joinOn: String): DataFrame = {
df.filter { row =>
val id = row.getAs[String](joinOn)
null != id
}
}
def drop(df: DataFrame, cols: Seq[Column]): DataFrame = cols match {
case head +: tail =>
drop(df.drop(head), tail)
case Nil => df
}
implicit def caseDSasDF[A]: Case1.Aux[P.type, Task[Dataset[A]], Task[DataFrame]] = {
new Case1[P.type, Task[Dataset[A]]] {
type Result = Task[DataFrame]
override val value: (Task[Dataset[A]] :: HNil) => Task[DataFrame] = { task => task.head.map { _.toDF } }
}
}
@dreadedsoftware
Copy link
Author

import fs2._
import shapeless._
import shapeless.poly._
import shapeless.ops.hlist._
import shapeless.ops.nat._

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment