Skip to content

Instantly share code, notes, and snippets.

@lamdor
Created February 25, 2011 04:42
Show Gist options
  • Save lamdor/843369 to your computer and use it in GitHub Desktop.
Save lamdor/843369 to your computer and use it in GitHub Desktop.
Using ScalaSig to dynamically create Avro schema and serialize a instance of a case class. It's very rough around the edges
package eg
import org.apache.avro._
import generic.{GenericDatumWriter}
import reflect.{ReflectDatumWriter}
import specific.{SpecificDatumWriter}
import io.{BinaryEncoder, JsonEncoder}
import java.io.ByteArrayOutputStream
import java.lang.reflect.{Method => JMethod}
import scala.tools.scalap.scalax.rules.scalasig._
case class Pet(name: String, quantity: Int)
object ScalaSigFields {
case class CaseClassField(name: String, index: Int, infoType: Type, method: JMethod)
def caseClassFields[A](implicit manifest: Manifest[A]): Seq[CaseClassField] = {
val clazz = manifest.erasure
val sig: ScalaSig = ScalaSigParser.parse(clazz) match {
case Some(s) => s
case None => throw new RuntimeException("oops")
}
val tlo = sig.topLevelClasses.head
tlo.children.filter(_.isCaseAccessor).filter(!_.isPrivate).map(_.asInstanceOf[MethodSymbol]).zipWithIndex.map {
case (meth, i) => CaseClassField(meth.name, i, meth.infoType, clazz.getMethod(meth.name))
}
}
}
object ScalaSigSchema {
import ScalaSigFields._
def schema[A : Manifest]: Schema = {
val fields = new java.util.ArrayList[Schema.Field]
caseClassFields[A].foreach { f =>
fields.add(new Schema.Field(f.name, Schema.create(typeForField(f)), "", null))
}
return Schema.createRecord(fields)
}
def typeForField(f: CaseClassField): Schema.Type = f.infoType match {
case PolyType(TypeRefType(_, t, _), _) => schemaTypeForSymbol(t)
case _ => throw new RuntimeException("oops")
}
def schemaTypeForSymbol(s: Symbol): Schema.Type = s.toString match {
case "scala.Predef.String" => Schema.Type.STRING
case "scala.Int" => Schema.Type.INT
case _ => throw new RuntimeException("oops")
}
}
class ScalaAvroDatumWriter[A : Manifest] extends GenericDatumWriter[A](ScalaSigSchema.schema[A]) {
import ScalaSigFields._
override def getField(record: Object, fieldName: String, position: Int): Object = {
val method = caseClassFields[A].apply(position).method
method.invoke(record)
}
}
object App {
def main(args: Array[String]): Unit = {
val lou = Pet("Lou", 1)
println("before")
println(lou)
val baos = new ByteArrayOutputStream
val writer = new ScalaAvroDatumWriter[Pet]
val encoder = new JsonEncoder(ScalaSigSchema.schema[Pet], baos)
writer.write(lou, encoder)
encoder flush
println("after")
val serialized = new String(baos.toByteArray)
println(serialized)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment