Last active
September 17, 2019 18:19
-
-
Save MDIB/387e0a9c8667ef6b54759e43addfb449 to your computer and use it in GitHub Desktop.
Spark Schema Comparison
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql.types._ | |
import cats._ | |
import cats.implicits._ | |
sealed trait ComparationResult | |
case object Validated extends ComparationResult | |
case class Invalid(msg: String) extends ComparationResult | |
def safeGetColumn(dfSchema: StructType,columnName: String) = | |
try{ | |
Some(dfSchema(columnName)) | |
}catch { | |
case x: IllegalArgumentException => None | |
case x : Throwable => throw x | |
} | |
def matchDFSchemas(df1: DataFrame,df2: DataFrame) = { | |
val df1Columns = df1.schema.map(_.name).toSet | |
val df2Columns = df2.schema.map(_.name).toSet | |
(df1Columns union df2Columns).map{columnName => | |
val df1Col = safeGetColumn(df1.schema,columnName) | |
val df2Col = safeGetColumn(df2.schema,columnName) | |
(df1Col,df2Col).mapN{(col1,col2) => | |
if (col1.dataType != col2.dataType) | |
Invalid(msg = String.format("Different types on Column: %s. df1 type: %s and df2 type %s:",columnName,col1.dataType,col2.dataType)) | |
else Validated | |
}.getOrElse(Invalid(msg = String.format("Column doesn't exists on both DataFrames: %s",columnName))) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment