Created
February 25, 2011 04:42
-
-
Save lamdor/843369 to your computer and use it in GitHub Desktop.
Using ScalaSig to dynamically create Avro schema and serialize a instance of a case class. It's very rough around the edges
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package eg | |
import org.apache.avro._ | |
import generic.{GenericDatumWriter} | |
import reflect.{ReflectDatumWriter} | |
import specific.{SpecificDatumWriter} | |
import io.{BinaryEncoder, JsonEncoder} | |
import java.io.ByteArrayOutputStream | |
import java.lang.reflect.{Method => JMethod} | |
import scala.tools.scalap.scalax.rules.scalasig._ | |
case class Pet(name: String, quantity: Int) | |
object ScalaSigFields { | |
case class CaseClassField(name: String, index: Int, infoType: Type, method: JMethod) | |
def caseClassFields[A](implicit manifest: Manifest[A]): Seq[CaseClassField] = { | |
val clazz = manifest.erasure | |
val sig: ScalaSig = ScalaSigParser.parse(clazz) match { | |
case Some(s) => s | |
case None => throw new RuntimeException("oops") | |
} | |
val tlo = sig.topLevelClasses.head | |
tlo.children.filter(_.isCaseAccessor).filter(!_.isPrivate).map(_.asInstanceOf[MethodSymbol]).zipWithIndex.map { | |
case (meth, i) => CaseClassField(meth.name, i, meth.infoType, clazz.getMethod(meth.name)) | |
} | |
} | |
} | |
object ScalaSigSchema { | |
import ScalaSigFields._ | |
def schema[A : Manifest]: Schema = { | |
val fields = new java.util.ArrayList[Schema.Field] | |
caseClassFields[A].foreach { f => | |
fields.add(new Schema.Field(f.name, Schema.create(typeForField(f)), "", null)) | |
} | |
return Schema.createRecord(fields) | |
} | |
def typeForField(f: CaseClassField): Schema.Type = f.infoType match { | |
case PolyType(TypeRefType(_, t, _), _) => schemaTypeForSymbol(t) | |
case _ => throw new RuntimeException("oops") | |
} | |
def schemaTypeForSymbol(s: Symbol): Schema.Type = s.toString match { | |
case "scala.Predef.String" => Schema.Type.STRING | |
case "scala.Int" => Schema.Type.INT | |
case _ => throw new RuntimeException("oops") | |
} | |
} | |
class ScalaAvroDatumWriter[A : Manifest] extends GenericDatumWriter[A](ScalaSigSchema.schema[A]) { | |
import ScalaSigFields._ | |
override def getField(record: Object, fieldName: String, position: Int): Object = { | |
val method = caseClassFields[A].apply(position).method | |
method.invoke(record) | |
} | |
} | |
object App { | |
def main(args: Array[String]): Unit = { | |
val lou = Pet("Lou", 1) | |
println("before") | |
println(lou) | |
val baos = new ByteArrayOutputStream | |
val writer = new ScalaAvroDatumWriter[Pet] | |
val encoder = new JsonEncoder(ScalaSigSchema.schema[Pet], baos) | |
writer.write(lou, encoder) | |
encoder flush | |
println("after") | |
val serialized = new String(baos.toByteArray) | |
println(serialized) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment