import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.functions._
/**
Spark doesn't allow you to join dataframes with different schemas.
This will figure out the missing fields and tack them on to the
separate dataframes and join everything together into 1 union
*/
def unionByNameWithDefaultNull(dfs: DataFrame*): DataFrame = {
require(dfs.nonEmpty)
val allDistinctFields: Seq[StructField] = (for {
df <- dfs
field <- df.schema.fields
} yield field).distinct
dfs.reduce[DataFrame]{
case (d1: DataFrame, d2: DataFrame) =>
val d1RequiredAdditionalFields = allDistinctFields.diff(d1.schema.fields)
val enrichedd1 = d1RequiredAdditionalFields.foldLeft(d1) {
case (df1, c) =>
df1.withColumn(c.name, lit(null))
}
val d2RequiredAdditionalFields = allDistinctFields.diff(d2.schema.fields)
val enrichedd2 = d2RequiredAdditionalFields.foldLeft(d2) {
case (df2, c) =>
df2.withColumn(c.name, lit(null))
}
enrichedd1.unionByName(enrichedd2)
}
}
Created
August 1, 2019 20:35
-
-
Save agaro1121/5bb717c39d1e0d6e55b02bfc61c1c937 to your computer and use it in GitHub Desktop.
Spark Notes
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment