Last active
May 5, 2017 13:43
-
-
Save dreadedsoftware/2d8dbbd5829b7360354c5b98f8035d3d to your computer and use it in GitHub Desktop.
Takes an HList of Task[Dataset] and joins them on a column producing a Task[DataFrame]
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object P extends Poly1 | |
val dummy = "_dummy_" | |
def join[N <: Nat, L <: HList, AsDF <: HList]( | |
in: L, joinOn: String | |
)(implicit // | |
length: Length.Aux[L, N], | |
notZero: N GT nat._0, | |
fill: Fill.Aux[N, Task[DataFrame], AsDF], | |
mapper: Mapper.Aux[P.type, L, AsDF], | |
toList: ToList[AsDF, Task[DataFrame]], | |
sqlCtx: SQLContext): Task[DataFrame] = { | |
import sqlCtx.implicits._ | |
val hList = mapper(in) | |
val list = toList(hList) | |
val task = list.foldLeft(Task.now { List[DataFrame]() }) { (acc, next) => | |
for { | |
acc <- acc | |
next <- next | |
} yield { next +: acc } | |
} | |
task.map { | |
_.reduce { (left, right) => | |
val colLeft = left.columns.toSet | |
val colRight = right.columns.toSet | |
val intersection = colLeft intersect colRight - joinOn | |
val reducedRight = drop(right, intersection.map(right(_)).toSeq).distinct | |
val join = left.join( | |
reducedRight, | |
left(joinOn) === reducedRight(joinOn), | |
"outer" | |
) | |
val withColumn = join.withColumn( | |
dummy, | |
when(left(joinOn).isNull, reducedRight(joinOn)) | |
.otherwise(left(joinOn)) | |
) | |
val dropped = drop(withColumn, Seq(left(joinOn), reducedRight(joinOn))) | |
val result = dropped.withColumnRenamed(dummy, joinOn) | |
result.printSchema | |
notNull(result, joinOn).distinct | |
} | |
} | |
} | |
def notNull(df: DataFrame, joinOn: String): DataFrame = { | |
df.filter { row => | |
val id = row.getAs[String](joinOn) | |
null != id | |
} | |
} | |
def drop(df: DataFrame, cols: Seq[Column]): DataFrame = cols match { | |
case head +: tail => | |
drop(df.drop(head), tail) | |
case Nil => df | |
} | |
implicit def caseDSasDF[A]: Case1.Aux[P.type, Task[Dataset[A]], Task[DataFrame]] = { | |
new Case1[P.type, Task[Dataset[A]]] { | |
type Result = Task[DataFrame] | |
override val value: (Task[Dataset[A]] :: HNil) => Task[DataFrame] = { task => task.head.map { _.toDF } } | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
import fs2._
import shapeless._
import shapeless.poly._
import shapeless.ops.hlist._
import shapeless.ops.nat._