Skip to content

Instantly share code, notes, and snippets.

@prodeezy
Last active September 26, 2019 08:21
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save prodeezy/b2cc35b87fca7d43ae681d45b3d7cab3 to your computer and use it in GitHub Desktop.
Save prodeezy/b2cc35b87fca7d43ae681d45b3d7cab3 to your computer and use it in GitHub Desktop.
Iceberg Schema Evolution Scenarios
// *** Sample Data
bash-3.2$ cat people_flat.json
{"name":"Michael", "grade": 4.0}
{"name":"Andy", "age":30, "grade": 3.5}
{"name":"Justin", "age":19}
bash-3.2$ cat people_reordered.json
{"age":65, "name":"Biswa", "grade": 4.0}
bash-3.2$ cat people_added_fields.json
{"age":5, "name":"Uhuru", "grade": 4.5, "location": { "lat": 101.5, "lon": 68.0 } }
bash-3.2$ cat people_nested_fields_reordered.json
{"age":25, "name":"Jolie", "grade": 3.2, "location": { "lon": 42.0, "lat": 189.3} }
// Test Schema Evolution in Iceberg
import org.apache.spark.sql.types._ ;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.Schema;
import org.apache.iceberg.spark.SparkSchemaUtil;
// Create a dataset with data
val sparkSchema = new StructType().add("name", StringType).add("age", IntegerType).add("grade", DoubleType)
val jsonDf = spark.read.schema(sparkSchema).json("people_flat.json")
val tables = new HadoopTables()
val iceSchema = SparkSchemaUtil.convert(sparkSchema)
val iceTable = tables.create(iceSchema, "./iceberg-schema-evolution-test")
jsonDf.write.format("iceberg").mode("append").save("./iceberg-schema-evolution-test")
spark.read.format("iceberg").load("./iceberg-schema-evolution-test").show()
// *** Add new data with re-ordered top level fields using original schema
val originalTable = tables.load("./iceberg-schema-evolution-test")
val originalSchema = originalTable.schema
val originalSparkSchema = SparkSchemaUtil.convert(originalTable.schema)
val jsonDfReordered = spark.read.schema(originalSparkSchema).json("people_reordered.json")
jsonDfReordered.write.format("iceberg").mode("append").save("./iceberg-schema-evolution-test")
spark.read.format("iceberg").load("./iceberg-schema-evolution-test").show()
// *** Add new data with added column by transforming original schema and using that to write
val originalTable = tables.load("./iceberg-schema-evolution-test")
val locationType = new StructType().add("lat", DoubleType).add("lon", DoubleType)
originalTable.updateSchema().addColumn("location", SparkSchemaUtil.convert(locationType).asStruct).commit()
originalTable.refresh()
val sparkSchemaAddedField = SparkSchemaUtil.convert(originalTable.schema)
val jsonDfAddedFields = spark.read.schema(sparkSchemaAddedField).json("people_added_fields.json")
jsonDfAddedFields.write.format("iceberg").mode("append").save("./iceberg-schema-evolution-test")
spark.read.format("iceberg").load("./iceberg-schema-evolution-test").show()
// *** Add new data whose nested column struct has fields re-ordered
val originalTable = tables.load("./iceberg-schema-evolution-test")
val originalSparkSchema = SparkSchemaUtil.convert(originalTable.schema)
val jsonDfNestedReordered = spark.read.schema(originalSparkSchema).json("people_nested_fields_reordered.json")
jsonDfNestedReordered.write.format("iceberg").mode("append").save("./iceberg-schema-evolution-test")
spark.read.format("iceberg").load("./iceberg-schema-evolution-test").show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment