Skip to content

Instantly share code, notes, and snippets.

@ldcasillas-progreso
Created November 19, 2015 22:11
Show Gist options
  • Save ldcasillas-progreso/871af3c1a1790be975fd to your computer and use it in GitHub Desktop.
Save ldcasillas-progreso/871af3c1a1790be975fd to your computer and use it in GitHub Desktop.
Samza Avro Serde. Has no schema management support.
package com.oportun.fraud.demo.serde.avro;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.samza.serializers.Serde;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
/**
* A simple Avro {@link Serde}. Does not support schema management or evolution.
*
* @author Luis Casillas
*/
public class AvroSerde<T> implements Serde<T> {
////////////////////////////////////////////////////////////////////////////////////////////
//
// Factory methods
//
public static <T> Serde<T> specific(Class<T> klass) {
return new AvroSerde<>(klass, new SpecificDatumReader<>(klass), new SpecificDatumWriter<>(klass));
}
public static AvroSerde<? extends IndexedRecord> generic(Schema schema) {
return new AvroSerde<>(IndexedRecord.class,
new GenericDatumReader<IndexedRecord>(schema),
new GenericDatumWriter<IndexedRecord>(schema));
}
////////////////////////////////////////////////////////////////////////////////////////////
//
// Instance implementation
//
private final Class<T> klass;
private final DatumReader<T> reader;
private final DatumWriter<T> writer;
private AvroSerde(Class<T> klass, DatumReader<T> reader, DatumWriter<T> writer) {
this.klass = klass;
this.reader = reader;
this.writer = writer;
}
@Override
public T fromBytes(byte[] bytes) {
Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null);
try {
return klass.cast(reader.read(null, decoder));
} catch (IOException e) {
throw new IllegalStateException("could not decode value", e);
}
}
@Override
public byte[] toBytes(T object) {
try (ByteArrayOutputStream out = new ByteArrayOutputStream();) {
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
writer.write(object, encoder);
encoder.flush();
return out.toByteArray();
} catch (IOException e) {
throw new IllegalStateException("could not encode value", e);
}
}
}
package com.oportun.fraud.demo.serde.avro;
import org.apache.samza.config.Config;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.SerdeFactory;
/**
* A {@link SerdeFactory} class for "specific" Avro types—i.e., ones that have a custom Avro-generated class.
* To use this, create a subclass that implements the {@link #getAvroClass} method to one of your Avro-generated
* classes, and configure your Samza job to use your subclass as a {@link SerdeFactory} for the relevant source,
* store or output.
*
* <p>Note that this does no schema management, so if your schema evolves you're SOL.
*
* @author Luis Casillas
*/
public abstract class SpecificAvroSerdeFactory<T> implements SerdeFactory<T> {
@Override
public Serde<T> getSerde(String name, Config config) {
return AvroSerde.specific(getAvroClass());
}
protected abstract Class<T> getAvroClass();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment