Skip to content

Instantly share code, notes, and snippets.

@ldcasillas-progreso
Created November 19, 2015 22:21
Show Gist options
  • Save ldcasillas-progreso/3611d40d2833aa62c1b3 to your computer and use it in GitHub Desktop.
Save ldcasillas-progreso/3611d40d2833aa62c1b3 to your computer and use it in GitHub Desktop.
SpecificAvroJsonSerdeFactory, for Samza
package com.oportun.fraud.demo.serde.avro;
import com.google.common.base.Charsets;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.IndexedRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.io.JsonDecoder;
import org.apache.avro.io.JsonEncoder;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.samza.serializers.Serde;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
/**
* A simple Serde for translating between Avro and UTF-8 encoded JSON.
*
* @author Luis Casillas
*/
public class AvroJsonSerde<T> implements Serde<T> {
////////////////////////////////////////////////////////////////////////////////////////////
//
// Factory methods
//
public static <T> Serde<T> specific(Class<T> klass, Schema schema) {
return new AvroJsonSerde<>(klass, schema,
new SpecificDatumReader<>(klass),
new SpecificDatumWriter<>(klass),
false);
}
public static Serde<? extends IndexedRecord> generic(Schema schema) {
return new AvroJsonSerde<>(IndexedRecord.class, schema,
new GenericDatumReader<IndexedRecord>(schema),
new GenericDatumWriter<IndexedRecord>(schema),
false);
}
////////////////////////////////////////////////////////////////////////////////////////////
//
// Instance implementation
//
private final Schema schema;
private final Class<T> klass;
private final DatumReader<T> reader;
private final DatumWriter<T> writer;
private final boolean pretty;
/**
* The construction of JSONEncoder objects turns out to be costly. It turned up as one of the top items
* when profiling the application. So we reuse these objects by storing thread-local wrappers around them.
*/
private final ThreadLocal<CodecWrapper> wrappers = new ThreadLocal<CodecWrapper>() {
@Override
protected CodecWrapper initialValue() {
try {
return new CodecWrapper();
} catch (IOException e) {
throw new IllegalStateException("Could not initialize Avro/JSON codecs", e);
}
}
};
private AvroJsonSerde(Class<T> klass, Schema schema, DatumReader<T> reader, DatumWriter<T> writer, boolean pretty) {
this.klass = klass;
this.schema = schema;
this.reader = reader;
this.writer = writer;
this.pretty = pretty;
}
@Override
public T fromBytes(byte[] bytes) {
try {
return wrappers.get().decode(bytes);
} catch (IOException e) {
throw new IllegalStateException("could not decode value", e);
}
}
@Override
public byte[] toBytes(T object) {
try {
return wrappers.get().encode(object);
} catch (IOException e) {
throw new IllegalStateException("could not encode value", e);
}
}
private class CodecWrapper {
private final ByteArrayOutputStream buffer = new ByteArrayOutputStream();
private final JsonEncoder encoder;
private final JsonDecoder decoder;
private CodecWrapper() throws IOException {
this.encoder = EncoderFactory.get().jsonEncoder(schema, buffer, pretty);
this.decoder = DecoderFactory.get().jsonDecoder(schema, "");
}
public byte[] encode(T object) throws IOException {
writer.write(object, encoder);
encoder.flush();
byte[] result = buffer.toByteArray();
buffer.reset();
return result;
}
public T decode(byte[] in) throws IOException {
String str = new String(in, Charsets.UTF_8);
decoder.configure(str);
return klass.cast(reader.read(null, decoder));
}
}
}
package com.oportun.fraud.demo.serde.avro;
import org.apache.avro.Schema;
import org.apache.samza.config.Config;
import org.apache.samza.serializers.Serde;
import org.apache.samza.serializers.SerdeFactory;
/**
* A {@link SerdeFactory} class that (de)serializes "specific" Avro types—i.e., ones that have a custom Avro-generated
* class—to and from textual JSON objects.
*
* <p>To use this, create a subclass that implements the {@link #getAvroClass} method to one of your Avro-generated
* classes, and configure your Samza job to use your subclass as a {@link SerdeFactory} for the relevant source,
* store or output.
*
* @author Luis Casillas
*/
public abstract class SpecificAvroJsonSerdeFactory<T> implements SerdeFactory<T> {
@Override
public Serde<T> getSerde(String name, Config config) {
return AvroJsonSerde.specific(getAvroClass(), getSchema());
}
protected abstract Class<T> getAvroClass();
protected abstract Schema getSchema();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment