Skip to content

Instantly share code, notes, and snippets.

@ugoenyioha
Created January 30, 2017 18:27
Show Gist options
  • Save ugoenyioha/4547de653d70decc56eedfac3d5a790f to your computer and use it in GitHub Desktop.
Save ugoenyioha/4547de653d70decc56eedfac3d5a790f to your computer and use it in GitHub Desktop.
Avro Schema Parser - WIP
package com.example
import _root_.avro_schema_diff._
import org.apache.avro.{JsonProperties, Schema}
import org.apache.avro.Schema.{Field, Type}
import org.apache.avro.generic.GenericData.Record
object diff {
implicit class RichAvroSchema(schema: Schema) {
import collection.JavaConverters._
def fields: List[Field] = schema.getFields.asScala.toList
def flattenFields : List[String] = {
def loop(fieldList: List[Field], prefix: List[String], fieldNames: List[String]) : List[String] = {
if (fieldList.isEmpty) fieldNames
else {
val field = fieldList.head
val field_prefix = prefix.reverse.mkString("_") + (if (prefix.nonEmpty) "_" else "")
field.schema.getType match {
case Type.ARRAY =>
println("array")
loop(fieldList.tail, prefix, fieldNames) :::
loop(field.schema.getElementType.getFields.asScala.toList, field.name + "_" + field.schema.getElementType.getName :: prefix, List.empty)
case Type.RECORD =>
println(s"record: ${field.name} prefix: $prefix")
loop(fieldList.tail, prefix, fieldNames) :::
loop(field.schema.getFields.asScala.toList, field.name + "_" + field.schema.getName :: prefix, fieldNames)
case Type.UNION =>
println(s"union: ${field.name} prefix: $prefix")
loop(fieldList.tail, prefix, fieldNames) :::
field.schema.getTypes.asScala.toList.flatMap {
(s: Schema) => {
s.getType match {
case Type.ARRAY => loop(s.getElementType.getFields.asScala.toList, field.name + "_" + s.getElementType.getName :: prefix, List.empty)
case Type.NULL => List() // null in union means all the types in the union is optional
case _ => List(field_prefix + field.name)
}
}
} ::: fieldNames
case Type.NULL =>
println(s"null: ${field.name} prefix: $prefix")
fieldNames
case _ =>
println(s"other: ${field.name} prefix: $prefix")
loop(fieldList.tail, prefix, field_prefix + field.name :: fieldNames)
}
}
}
loop(schema.getFields.asScala.toList, prefix = List.empty, fieldNames = List.empty)
}
def fNamesTofTypes: Map[Field, Type] =
schema.fields.map((f: Field) => f -> f.schema.getType).toMap
}
implicit class RichAvroType(schemaType: Type) {
def fieldName: String = schemaType.getName
}
implicit class RichAvroField(field: Schema.Field) {
def fieldName : String = field.name()
}
}
object test {
import diff._
def fieldsAdded(rec1: Schema, rec2: Schema): List[Field] = {
val baseFieldNames = rec1.fNamesTofTypes
val missingField = (field : Field) => !baseFieldNames.contains(field)
rec2.fields.filter(missingField)
}
def fieldsMissing(rec1: Schema, rec2: Schema) : List[Field] = {
fieldsAdded(rec2, rec1)
}
def fieldTypeChanges(rec1: Schema, rec2: Schema) : List[String] = {
val commonFields: Set[Field] = rec1.fields.toSet intersect rec2.fields.toSet
val nameToFields1: Map[Field, Type] = rec1.fNamesTofTypes
val nameToFields2: Map[Field, Type] = rec2.fNamesTofTypes
def loop(typeChanges: List[String], fieldNames: Set[Field]) : List[String] = {
def typeChangeNote(field:Field, was: String, is: String): String =
s"${field.fieldName} type was $was and is now $is"
if (fieldNames.isEmpty) typeChanges
else {
val fName = fieldNames.head
val type1 = nameToFields1(fName)
val type2 = nameToFields2(fName)
if (type1 == type2)
loop(typeChanges, fieldNames.tail)
else {
loop(typeChangeNote(fName, type1.toString, type2.toString) :: typeChanges, fieldNames.tail)
}
}
}
loop(List.empty, commonFields)
}
}
object Hello {
def main(args: Array[String]): Unit = {
val fields3e =
"""
|{
| "type": "record",
| "name": "PlayerExtensionQOFSEvent",
| "namespace": "intelmedia.ws.client",
| "doc": "...",
| "fields": [{
| "name": "foo",
| "type": {
| "type": "array",
| "items": {
| "type": "record",
| "name": "componentVersion2",
| "fields": [{
| "name": "component",
| "type": "string",
| "doc": "..."
| }, {
| "name": "build",
| "type": "string",
| "doc": "..."
| }]
| }
| }
| }, {
| "name": "address",
| "type": {
| "type": "record",
| "name": "AddressUSRecord",
| "fields": [{
| "name": "streetaddress",
| "type": "string"
| }, {
| "name": "city",
| "type": "string"
| }]
| }
| }, {
| "name": "deviceTimestamp",
| "type": "long",
| "doc": "..."
| }, {
| "name": "serviceTimestamp",
| "type": [
| "null",
| "long"
| ],
| "doc": "device should always pass null",
| "default": null
| }, {
| "name": "deviceVersion",
| "type": [
| "null", {
| "type": "array",
| "items": {
| "type": "record",
| "name": "componentVersion",
| "fields": [{
| "name": "component",
| "type": "string",
| "doc": "..."
| }, {
| "name": "build",
| "type": "string",
| "doc": "..."
| }]
| }
| }
| ],
| "doc": "...",
| "default": null
| }],
| "event.version": 22
|}
|
""".stripMargin
import test._
import diff._
val parser5 = new Schema.Parser()
val schemaf = parser5.parse(fields3e)
println(schemaf.flattenFields)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment