Last active
September 26, 2019 08:21
-
-
Save prodeezy/b2cc35b87fca7d43ae681d45b3d7cab3 to your computer and use it in GitHub Desktop.
Iceberg Schema Evolution Scenarios
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// *** Sample Data | |
bash-3.2$ cat people_flat.json | |
{"name":"Michael", "grade": 4.0} | |
{"name":"Andy", "age":30, "grade": 3.5} | |
{"name":"Justin", "age":19} | |
bash-3.2$ cat people_reordered.json | |
{"age":65, "name":"Biswa", "grade": 4.0} | |
bash-3.2$ cat people_added_fields.json | |
{"age":5, "name":"Uhuru", "grade": 4.5, "location": { "lat": 101.5, "lon": 68.0 } } | |
bash-3.2$ cat people_nested_fields_reordered.json | |
{"age":25, "name":"Jolie", "grade": 3.2, "location": { "lon": 42.0, "lat": 189.3} } | |
// Test Schema Evolution in Iceberg | |
import org.apache.spark.sql.types._ ; | |
import org.apache.iceberg.hadoop.HadoopTables; | |
import org.apache.iceberg.Schema; | |
import org.apache.iceberg.spark.SparkSchemaUtil; | |
// Create a dataset with data | |
val sparkSchema = new StructType().add("name", StringType).add("age", IntegerType).add("grade", DoubleType) | |
val jsonDf = spark.read.schema(sparkSchema).json("people_flat.json") | |
val tables = new HadoopTables() | |
val iceSchema = SparkSchemaUtil.convert(sparkSchema) | |
val iceTable = tables.create(iceSchema, "./iceberg-schema-evolution-test") | |
jsonDf.write.format("iceberg").mode("append").save("./iceberg-schema-evolution-test") | |
spark.read.format("iceberg").load("./iceberg-schema-evolution-test").show() | |
// *** Add new data with re-ordered top level fields using original schema | |
val originalTable = tables.load("./iceberg-schema-evolution-test") | |
val originalSchema = originalTable.schema | |
val originalSparkSchema = SparkSchemaUtil.convert(originalTable.schema) | |
val jsonDfReordered = spark.read.schema(originalSparkSchema).json("people_reordered.json") | |
jsonDfReordered.write.format("iceberg").mode("append").save("./iceberg-schema-evolution-test") | |
spark.read.format("iceberg").load("./iceberg-schema-evolution-test").show() | |
// *** Add new data with added column by transforming original schema and using that to write | |
val originalTable = tables.load("./iceberg-schema-evolution-test") | |
val locationType = new StructType().add("lat", DoubleType).add("lon", DoubleType) | |
originalTable.updateSchema().addColumn("location", SparkSchemaUtil.convert(locationType).asStruct).commit() | |
originalTable.refresh() | |
val sparkSchemaAddedField = SparkSchemaUtil.convert(originalTable.schema) | |
val jsonDfAddedFields = spark.read.schema(sparkSchemaAddedField).json("people_added_fields.json") | |
jsonDfAddedFields.write.format("iceberg").mode("append").save("./iceberg-schema-evolution-test") | |
spark.read.format("iceberg").load("./iceberg-schema-evolution-test").show() | |
// *** Add new data whose nested column struct has fields re-ordered | |
val originalTable = tables.load("./iceberg-schema-evolution-test") | |
val originalSparkSchema = SparkSchemaUtil.convert(originalTable.schema) | |
val jsonDfNestedReordered = spark.read.schema(originalSparkSchema).json("people_nested_fields_reordered.json") | |
jsonDfNestedReordered.write.format("iceberg").mode("append").save("./iceberg-schema-evolution-test") | |
spark.read.format("iceberg").load("./iceberg-schema-evolution-test").show() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment