Skip to content

Instantly share code, notes, and snippets.

@eduardorost
Last active December 19, 2023 09:36
Show Gist options
  • Save eduardorost/82f0640d6eed50406285c3d393897414 to your computer and use it in GitHub Desktop.
Save eduardorost/82f0640d6eed50406285c3d393897414 to your computer and use it in GitHub Desktop.
Merge Schema with structs
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.slf4j.{Logger, LoggerFactory}
object Main {
val logger: Logger = LoggerFactory.getLogger(this.getClass)
private lazy val sparkConf: SparkConf = new SparkConf()
.setMaster("local[*]")
.setAppName("test")
private lazy val spark: SparkSession = SparkSession
.builder()
.config(sparkConf)
.getOrCreate()
val obj1 =
"""
{
"y": "dummy",
"document": {
"a": "dummy",
"c": "dummy",
"d": { "f": "dummy" }
},
"array": [{"q": "dummy"}]
}
"""
val obj2 =
"""
{
"y": 123,
"x": "dummy",
"document": {
"a": "dummy",
"b": "dummy",
"d": { "e": "dummy" }
},
"array": [{"p": "dummy"}]
}
"""
def main(args: Array[String]): Unit = {
import spark.implicits._
val schema1 = spark.read.option("dropFieldIfAllNull", "true").json(Seq(obj1).toDS).schema
val schema2 = spark.read.option("dropFieldIfAllNull", "true").json(Seq(obj2).toDS).schema
val schemaMerged = mergeStructTypes(schema1, schema2)
val jsonUnion = s"[$obj1, $obj2]"
val dfUnion = spark.read.schema(schemaMerged).json(Seq(jsonUnion).toDS)
dfUnion.printSchema()
dfUnion.show()
}
def mergeStructTypes(left: StructType, right: StructType): StructType = {
StructType(
(left ++ right)
.groupBy(_.name)
.map(it => {
mergeFields(it._1, it._2.head.dataType, it._2.last.dataType, it._2.last.metadata)
})
.toList
)
}
def mergeFields(name: String, left: DataType, right: DataType, metadata: Metadata): StructField = {
mergeDataTypes(left, right) match {
case _: StructType =>
StructField(name, mergeStructTypes(left.asInstanceOf[StructType], right.asInstanceOf[StructType]), nullable = true, metadata)
case _: ArrayType =>
StructField(
name,
ArrayType(mergeFields(name, left.asInstanceOf[ArrayType].elementType, right.asInstanceOf[ArrayType].elementType, metadata).dataType, containsNull = true),
nullable = true,
metadata
)
case _ =>
StructField(name, mergeDataTypes(left, right), nullable = true, metadata)
}
}
def mergeDataTypes(left: DataType, right: DataType): DataType = {
if (left == StringType)
left
else
right
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment