Created
November 19, 2015 22:21
-
-
Save ldcasillas-progreso/3611d40d2833aa62c1b3 to your computer and use it in GitHub Desktop.
SpecificAvroJsonSerdeFactory, for Samza
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.oportun.fraud.demo.serde.avro; | |
import com.google.common.base.Charsets; | |
import org.apache.avro.Schema; | |
import org.apache.avro.generic.GenericDatumReader; | |
import org.apache.avro.generic.GenericDatumWriter; | |
import org.apache.avro.generic.IndexedRecord; | |
import org.apache.avro.io.DatumReader; | |
import org.apache.avro.io.DatumWriter; | |
import org.apache.avro.io.DecoderFactory; | |
import org.apache.avro.io.EncoderFactory; | |
import org.apache.avro.io.JsonDecoder; | |
import org.apache.avro.io.JsonEncoder; | |
import org.apache.avro.specific.SpecificDatumReader; | |
import org.apache.avro.specific.SpecificDatumWriter; | |
import org.apache.samza.serializers.Serde; | |
import java.io.ByteArrayOutputStream; | |
import java.io.IOException; | |
/** | |
* A simple Serde for translating between Avro and UTF-8 encoded JSON. | |
* | |
* @author Luis Casillas | |
*/ | |
public class AvroJsonSerde<T> implements Serde<T> { | |
//////////////////////////////////////////////////////////////////////////////////////////// | |
// | |
// Factory methods | |
// | |
public static <T> Serde<T> specific(Class<T> klass, Schema schema) { | |
return new AvroJsonSerde<>(klass, schema, | |
new SpecificDatumReader<>(klass), | |
new SpecificDatumWriter<>(klass), | |
false); | |
} | |
public static Serde<? extends IndexedRecord> generic(Schema schema) { | |
return new AvroJsonSerde<>(IndexedRecord.class, schema, | |
new GenericDatumReader<IndexedRecord>(schema), | |
new GenericDatumWriter<IndexedRecord>(schema), | |
false); | |
} | |
//////////////////////////////////////////////////////////////////////////////////////////// | |
// | |
// Instance implementation | |
// | |
private final Schema schema; | |
private final Class<T> klass; | |
private final DatumReader<T> reader; | |
private final DatumWriter<T> writer; | |
private final boolean pretty; | |
/** | |
* The construction of JSONEncoder objects turns out to be costly. It turned up as one of the top items | |
* when profiling the application. So we reuse these objects by storing thread-local wrappers around them. | |
*/ | |
private final ThreadLocal<CodecWrapper> wrappers = new ThreadLocal<CodecWrapper>() { | |
@Override | |
protected CodecWrapper initialValue() { | |
try { | |
return new CodecWrapper(); | |
} catch (IOException e) { | |
throw new IllegalStateException("Could not initialize Avro/JSON codecs", e); | |
} | |
} | |
}; | |
private AvroJsonSerde(Class<T> klass, Schema schema, DatumReader<T> reader, DatumWriter<T> writer, boolean pretty) { | |
this.klass = klass; | |
this.schema = schema; | |
this.reader = reader; | |
this.writer = writer; | |
this.pretty = pretty; | |
} | |
@Override | |
public T fromBytes(byte[] bytes) { | |
try { | |
return wrappers.get().decode(bytes); | |
} catch (IOException e) { | |
throw new IllegalStateException("could not decode value", e); | |
} | |
} | |
@Override | |
public byte[] toBytes(T object) { | |
try { | |
return wrappers.get().encode(object); | |
} catch (IOException e) { | |
throw new IllegalStateException("could not encode value", e); | |
} | |
} | |
private class CodecWrapper { | |
private final ByteArrayOutputStream buffer = new ByteArrayOutputStream(); | |
private final JsonEncoder encoder; | |
private final JsonDecoder decoder; | |
private CodecWrapper() throws IOException { | |
this.encoder = EncoderFactory.get().jsonEncoder(schema, buffer, pretty); | |
this.decoder = DecoderFactory.get().jsonDecoder(schema, ""); | |
} | |
public byte[] encode(T object) throws IOException { | |
writer.write(object, encoder); | |
encoder.flush(); | |
byte[] result = buffer.toByteArray(); | |
buffer.reset(); | |
return result; | |
} | |
public T decode(byte[] in) throws IOException { | |
String str = new String(in, Charsets.UTF_8); | |
decoder.configure(str); | |
return klass.cast(reader.read(null, decoder)); | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.oportun.fraud.demo.serde.avro; | |
import org.apache.avro.Schema; | |
import org.apache.samza.config.Config; | |
import org.apache.samza.serializers.Serde; | |
import org.apache.samza.serializers.SerdeFactory; | |
/** | |
* A {@link SerdeFactory} class that (de)serializes "specific" Avro types—i.e., ones that have a custom Avro-generated | |
* class—to and from textual JSON objects. | |
* | |
* <p>To use this, create a subclass that implements the {@link #getAvroClass} method to one of your Avro-generated | |
* classes, and configure your Samza job to use your subclass as a {@link SerdeFactory} for the relevant source, | |
* store or output. | |
* | |
* @author Luis Casillas | |
*/ | |
public abstract class SpecificAvroJsonSerdeFactory<T> implements SerdeFactory<T> { | |
@Override | |
public Serde<T> getSerde(String name, Config config) { | |
return AvroJsonSerde.specific(getAvroClass(), getSchema()); | |
} | |
protected abstract Class<T> getAvroClass(); | |
protected abstract Schema getSchema(); | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment