Skip to content

Instantly share code, notes, and snippets.

@amitgupta1202
Created August 29, 2021 06:42
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save amitgupta1202/93ef9ffd23eb16364c644a11b9ae732d to your computer and use it in GitHub Desktop.
Save amitgupta1202/93ef9ffd23eb16364c644a11b9ae732d to your computer and use it in GitHub Desktop.
implementation of apache beam avro coder for avro specific record. Using of serialization method on SpecificRecord seems to be ignored by beam is Serializable coder is registered for SpecificRecord.
import org.apache.avro.Schema;
import org.apache.avro.io.*;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.CustomCoder;
import org.apache.beam.sdk.util.EmptyOnDeserializationThreadLocal;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
public final class SpecificRecordAvroCoder<T> extends CustomCoder<T> {
private static final EncoderFactory ENCODER_FACTORY = EncoderFactory.get();
private static final DecoderFactory DECODER_FACTORY = DecoderFactory.get();
private final EmptyOnDeserializationThreadLocal<BinaryDecoder> decoder;
private final EmptyOnDeserializationThreadLocal<BinaryEncoder> encoder;
private final EmptyOnDeserializationThreadLocal<DatumWriter<T>> writer;
private final EmptyOnDeserializationThreadLocal<DatumReader<T>> reader;
public SpecificRecordAvroCoder(Schema schema) {
this.decoder = new EmptyOnDeserializationThreadLocal<>();
this.encoder = new EmptyOnDeserializationThreadLocal<>();
this.reader = new EmptyOnDeserializationThreadLocal<>() {
@Override
public DatumReader<T> initialValue() {
return new SpecificDatumReader<>(schema);
}
};
this.writer = new EmptyOnDeserializationThreadLocal<>() {
@Override
public DatumWriter<T> initialValue() {
return new SpecificDatumWriter<>(schema);
}
};
}
@Override
public void encode(T value, @UnknownKeyFor @NonNull @Initialized OutputStream outStream) throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull @Initialized IOException {
var encoderInstance = ENCODER_FACTORY.directBinaryEncoder(outStream, encoder.get());
encoder.set(encoderInstance);
writer.get().write(value, encoderInstance);
}
@Override
public T decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream) throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull @Initialized IOException {
var decoderInstance = DECODER_FACTORY.directBinaryDecoder(inStream, decoder.get());
decoder.set(decoderInstance);
return reader.get().read(null, decoderInstance);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment