Skip to content

Instantly share code, notes, and snippets.

@massie
Last active May 23, 2016 19:10
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save massie/6373361 to your computer and use it in GitHub Desktop.
Save massie/6373361 to your computer and use it in GitHub Desktop.
A simple Scala class that wraps Avro Generic and Specific objects to make them "Serializable".
package com.zenfractal
import org.apache.avro.generic.{GenericDatumWriter, GenericDatumReader, IndexedRecord}
import java.io.{ObjectOutputStream, IOException, ObjectInputStream}
import org.apache.avro.specific.{SpecificDatumWriter, SpecificRecordBase, SpecificDatumReader}
import org.apache.avro.Schema
import org.apache.avro.io._
/**
* Makes Avro objects 'Serializable', e.g.
*
* val avroThingy = new AvroThingy()
* val serializableObject = new AvroSerializable[AvroThingy](avroThingy)
* serializableObject.writeObject(...)
* serializableObject.readObject(...)
*
* Works with Avro Generic and Specific objects
*
* @param avroObj The Avro object to be read/written
* @tparam T The type of Avro object
*/
class AvroSerializable[T <: IndexedRecord](var avroObj: T)
extends IndexedRecord with Serializable {
var encoder: BinaryEncoder = null
var decoder: BinaryDecoder = null
val writer: DatumWriter[T] = {
avroObj match {
case specificObj: SpecificRecordBase =>
new SpecificDatumWriter[T](specificObj.getSchema)
case genericObj: IndexedRecord =>
new GenericDatumWriter[T](genericObj.getSchema)
}
}
@throws(classOf[IOException])
private def writeObject(out: ObjectOutputStream): Unit = {
encoder = EncoderFactory.get().binaryEncoder(out, encoder)
val className = avroObj.getClass.getCanonicalName
// Write the full class path of the object
encoder.writeString(className)
// Write the object
writer.write(avroObj, encoder)
encoder.flush()
}
@throws(classOf[IOException])
private def readObject(in: ObjectInputStream): Unit = {
decoder = DecoderFactory.get().binaryDecoder(in, decoder)
// Read the full path of the object class
val fullClassName = decoder.readString()
// Load the class
val classIn = getClass.getClassLoader.loadClass(fullClassName).asInstanceOf[Class[T]]
// Create a new instance of the class
avroObj = classIn.newInstance()
// Create a reader for the class
val reader = {
avroObj match {
case specificObj: SpecificRecordBase =>
new SpecificDatumReader[T](specificObj.getSchema)
case genericObj: IndexedRecord =>
new GenericDatumReader[T](genericObj.getSchema)
}
}
// Set values on our newly created Avro object
reader.read(avroObj, decoder)
}
def getSchema: Schema = avroObj.getSchema
def put(i: Int, v: scala.Any) {
avroObj.put(i, v)
}
def get(i: Int): AnyRef = avroObj.get(i)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment