Skip to content

Instantly share code, notes, and snippets.

@eliaslevy
Created October 5, 2016 21:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save eliaslevy/155c72b089b14d54a36038d18272f04c to your computer and use it in GitHub Desktop.
Save eliaslevy/155c72b089b14d54a36038d18272f04c to your computer and use it in GitHub Desktop.
Avro SpecificRecord Serde for Kafka
package example.kstreams
import org.apache.kafka.common.serialization.Serde
import org.apache.kafka.common.serialization.Serializer
import org.apache.kafka.common.serialization.Deserializer
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import org.apache.avro.io.EncoderFactory
import org.apache.avro.io.DecoderFactory
import org.apache.avro.specific.SpecificDatumWriter
import org.apache.avro.specific.SpecificDatumReader
import org.apache.avro.specific.SpecificRecord
import org.apache.avro.Schema
import java.util.{Map => JMap}
class SpecificAvroSerde[T <: SpecificRecord](schema: Schema)
extends Serde[T] with Serializer[T] with Deserializer[T]
{
val out = new ByteArrayOutputStream()
val encoder = EncoderFactory.get().directBinaryEncoder(out, null)
val decoder = DecoderFactory.get().directBinaryDecoder(null,null)
var writer = new SpecificDatumWriter[T](schema)
var reader = new SpecificDatumReader[T](schema)
def configure(configs: JMap[String, _], isKey: Boolean): Unit = {}
def serializer() : Serializer[T] = this
def deserializer(): Deserializer[T] = this
def serialize(topic: String, obj: T): Array[Byte] = {
out.reset()
writer.write(obj, encoder)
out.toByteArray()
}
def deserialize(topic: String, bytes: Array[Byte]): T = {
DecoderFactory.get().directBinaryDecoder(new ByteArrayInputStream(bytes), decoder)
reader.read(null.asInstanceOf[T], decoder)
}
def close(): Unit = {}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment