Skip to content

Instantly share code, notes, and snippets.

@gasparms
Created November 6, 2017 20:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gasparms/5d0740bd61a500357e0230756be963e1 to your computer and use it in GitHub Desktop.
Save gasparms/5d0740bd61a500357e0230756be963e1 to your computer and use it in GitHub Desktop.
package me.gasparms.pocs.avro
import java.io.File
import com.databricks.spark.avro._
import org.apache.avro.Schema
import org.apache.spark.sql.{SaveMode, SparkSession}
/**
* Schema Evolution tests
* We have defined schemas similar to avro format but with more
*/
object SchemaEvolution {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().master("local").getOrCreate()
//Write one partition with Data1 version
spark
.createDataFrame(AvroData.getData1("01"))
.write
.mode(SaveMode.Overwrite)
.partitionBy("data_date")
.avro("/tmp/avro/")
//Read with Prueba1_Avro schema
val avroSchemaFile1 = new File(getClass.getClassLoader
.getResource("avro/Prueba1_Avro.avsc").getFile)
val schema1 = new Schema.Parser().parse(avroSchemaFile1)
spark
.read
.format("com.databricks.spark.avro")
.option("avroSchema", schema1.toString)
.load("/tmp/avro/").show()
//Write second partition with Data2 variant which contains a new field
spark
.createDataFrame(AvroData.getData2("02"))
.write
.mode(SaveMode.Append)
.partitionBy("data_date")
.avro("/tmp/avro/")
//Read with Prueba2_Avro schema the two partitions
val avroSchemaFile2 = new File(getClass.getClassLoader
.getResource("avro/Prueba2_Avro.avsc").getFile)
val schema2 = new Schema.Parser().parse(avroSchemaFile2)
spark
.read
.format("com.databricks.spark.avro")
.option("avroSchema", schema2.toString)
.load("/tmp/avro/").show()
//Write third partition with Data3 variant which contains a field renamed
spark
.createDataFrame(AvroData.getData3("03"))
.write
.mode(SaveMode.Append)
.partitionBy("data_date")
.avro("/tmp/avro/")
//Read with Prueba3_Avro schema the two partitions
val avroSchemaFile3 = new File(getClass.getClassLoader
.getResource("avro/Prueba3_Avro.avsc").getFile)
val schema3 = new Schema.Parser().parse(avroSchemaFile3)
spark
.read
.format("com.databricks.spark.avro")
.option("avroSchema", schema3.toString)
.load("/tmp/avro/").show(30)
}
}
{
"namespace": "com.kdar",
"name": "t_kdar_exchange_rate",
"doc": "kdar",
"type": "record",
"fields": [ {
"name": "data_date",
"type": "string",
"default": ""
}, {
"name": "xxtc_id",
"type": "string",
"default": ""
}, {
"name": "currency_id",
"type": "string"
}, {
"name": "zero_number",
"type": "int",
"default": 0
}, {
"name": "mid_amount",
"type": "double",
"default": 0
}, {
"name": "ask_amount",
"type": "float",
"default": 0
}, {
"name": "bid_amount",
"type": "double",
"default": 0
}, {
"name": "npp_id",
"aliases": ["npp_id_renamed"],
"type": "string",
"default": ""
}, {
"name": "new_field",
"type": "string",
"default": ""
}]
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment