Created
November 19, 2015 22:11
-
-
Save ldcasillas-progreso/871af3c1a1790be975fd to your computer and use it in GitHub Desktop.
Samza Avro Serde. Has no schema management support.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.oportun.fraud.demo.serde.avro; | |
import org.apache.avro.Schema; | |
import org.apache.avro.generic.GenericDatumReader; | |
import org.apache.avro.generic.GenericDatumWriter; | |
import org.apache.avro.generic.IndexedRecord; | |
import org.apache.avro.io.BinaryEncoder; | |
import org.apache.avro.io.DatumReader; | |
import org.apache.avro.io.DatumWriter; | |
import org.apache.avro.io.Decoder; | |
import org.apache.avro.io.DecoderFactory; | |
import org.apache.avro.io.EncoderFactory; | |
import org.apache.avro.specific.SpecificDatumReader; | |
import org.apache.avro.specific.SpecificDatumWriter; | |
import org.apache.samza.serializers.Serde; | |
import java.io.ByteArrayOutputStream; | |
import java.io.IOException; | |
/** | |
* A simple Avro {@link Serde}. Does not support schema management or evolution. | |
* | |
* @author Luis Casillas | |
*/ | |
public class AvroSerde<T> implements Serde<T> { | |
//////////////////////////////////////////////////////////////////////////////////////////// | |
// | |
// Factory methods | |
// | |
public static <T> Serde<T> specific(Class<T> klass) { | |
return new AvroSerde<>(klass, new SpecificDatumReader<>(klass), new SpecificDatumWriter<>(klass)); | |
} | |
public static AvroSerde<? extends IndexedRecord> generic(Schema schema) { | |
return new AvroSerde<>(IndexedRecord.class, | |
new GenericDatumReader<IndexedRecord>(schema), | |
new GenericDatumWriter<IndexedRecord>(schema)); | |
} | |
//////////////////////////////////////////////////////////////////////////////////////////// | |
// | |
// Instance implementation | |
// | |
private final Class<T> klass; | |
private final DatumReader<T> reader; | |
private final DatumWriter<T> writer; | |
private AvroSerde(Class<T> klass, DatumReader<T> reader, DatumWriter<T> writer) { | |
this.klass = klass; | |
this.reader = reader; | |
this.writer = writer; | |
} | |
@Override | |
public T fromBytes(byte[] bytes) { | |
Decoder decoder = DecoderFactory.get().binaryDecoder(bytes, null); | |
try { | |
return klass.cast(reader.read(null, decoder)); | |
} catch (IOException e) { | |
throw new IllegalStateException("could not decode value", e); | |
} | |
} | |
@Override | |
public byte[] toBytes(T object) { | |
try (ByteArrayOutputStream out = new ByteArrayOutputStream();) { | |
BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); | |
writer.write(object, encoder); | |
encoder.flush(); | |
return out.toByteArray(); | |
} catch (IOException e) { | |
throw new IllegalStateException("could not encode value", e); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.oportun.fraud.demo.serde.avro; | |
import org.apache.samza.config.Config; | |
import org.apache.samza.serializers.Serde; | |
import org.apache.samza.serializers.SerdeFactory; | |
/** | |
* A {@link SerdeFactory} class for "specific" Avro types—i.e., ones that have a custom Avro-generated class. | |
* To use this, create a subclass that implements the {@link #getAvroClass} method to one of your Avro-generated | |
* classes, and configure your Samza job to use your subclass as a {@link SerdeFactory} for the relevant source, | |
* store or output. | |
* | |
* <p>Note that this does no schema management, so if your schema evolves you're SOL. | |
* | |
* @author Luis Casillas | |
*/ | |
public abstract class SpecificAvroSerdeFactory<T> implements SerdeFactory<T> { | |
@Override | |
public Serde<T> getSerde(String name, Config config) { | |
return AvroSerde.specific(getAvroClass()); | |
} | |
protected abstract Class<T> getAvroClass(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment