Create a gist now

Instantly share code, notes, and snippets.

What would you like to do?
Here's an example of how to embed Avro objects into a Kryo stream. You only need to register each Avro Specific class in the KryoRegistrator using the AvroSerializer class below and you're ready to go.
* Copyright (c) 2013. Regents of the University of California
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* See the License for the specific language governing permissions and
* limitations under the License.
package edu.berkeley.cs.amplab.adam.serialization
import org.apache.avro.specific.{SpecificDatumWriter, SpecificDatumReader, SpecificRecord}
import com.esotericsoftware.kryo.{Kryo, Serializer}
import{Input, Output}
import{BinaryDecoder, DecoderFactory, BinaryEncoder, EncoderFactory}
import spark.KryoRegistrator
import edu.berkeley.cs.amplab.adam.avro.{ADAMGenotype, ADAMPileup, ADAMRecord}
import edu.berkeley.cs.amplab.adam.models.{MatedReferencePositionSerializer, MatedReferencePosition, ReferencePositionSerializer, ReferencePosition}
class AvroSerializer[T <: SpecificRecord : ClassManifest] extends Serializer[T] {
val reader = new SpecificDatumReader[T](classManifest[T].erasure.asInstanceOf[Class[T]])
val writer = new SpecificDatumWriter[T](classManifest[T].erasure.asInstanceOf[Class[T]])
var encoder = null.asInstanceOf[BinaryEncoder]
var decoder = null.asInstanceOf[BinaryDecoder]
def write(kryo: Kryo, output: Output, record: T) = {
encoder = EncoderFactory.get().directBinaryEncoder(output, encoder)
writer.write(record, encoder)
def read(kryo: Kryo, input: Input, klazz: Class[T]): T = this.synchronized {
decoder = DecoderFactory.get().directBinaryDecoder(input, decoder)[T], decoder)
class AdamKryoRegistrator extends KryoRegistrator {
override def registerClasses(kryo: Kryo) {
kryo.register(classOf[ADAMRecord], new AvroSerializer[ADAMRecord]())
kryo.register(classOf[ADAMPileup], new AvroSerializer[ADAMPileup]())
kryo.register(classOf[ADAMGenotype], new AvroSerializer[ADAMGenotype]())
kryo.register(classOf[ReferencePosition], new ReferencePositionSerializer)
kryo.register(classOf[MatedReferencePosition], new MatedReferencePositionSerializer)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment