Skip to content

Instantly share code, notes, and snippets.

@agaro1121
Created August 1, 2019 20:35
Show Gist options
  • Save agaro1121/5bb717c39d1e0d6e55b02bfc61c1c937 to your computer and use it in GitHub Desktop.
Save agaro1121/5bb717c39d1e0d6e55b02bfc61c1c937 to your computer and use it in GitHub Desktop.
Spark Notes
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.StructField
import org.apache.spark.sql.functions._

/**
 Spark doesn't allow you to join dataframes with different schemas.
 This will figure out the missing fields and tack them on to the 
 separate dataframes and join everything together into 1 union
*/

def unionByNameWithDefaultNull(dfs: DataFrame*): DataFrame = {
  require(dfs.nonEmpty)

  val allDistinctFields: Seq[StructField] = (for {
    df <- dfs
    field <- df.schema.fields
  } yield field).distinct

    dfs.reduce[DataFrame]{
      case (d1: DataFrame, d2: DataFrame) =>
        val d1RequiredAdditionalFields = allDistinctFields.diff(d1.schema.fields)
        val enrichedd1 = d1RequiredAdditionalFields.foldLeft(d1) {
          case (df1, c) =>
            df1.withColumn(c.name, lit(null))
        }
        val d2RequiredAdditionalFields = allDistinctFields.diff(d2.schema.fields)
        val enrichedd2 = d2RequiredAdditionalFields.foldLeft(d2) {
          case (df2, c) =>
            df2.withColumn(c.name, lit(null))
        }
        enrichedd1.unionByName(enrichedd2)
    }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment