Created
August 29, 2021 06:42
-
-
Save amitgupta1202/93ef9ffd23eb16364c644a11b9ae732d to your computer and use it in GitHub Desktop.
implementation of apache beam avro coder for avro specific record. Using of serialization method on SpecificRecord seems to be ignored by beam is Serializable coder is registered for SpecificRecord.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.avro.Schema; | |
import org.apache.avro.io.*; | |
import org.apache.avro.specific.SpecificDatumReader; | |
import org.apache.avro.specific.SpecificDatumWriter; | |
import org.apache.beam.sdk.coders.CoderException; | |
import org.apache.beam.sdk.coders.CustomCoder; | |
import org.apache.beam.sdk.util.EmptyOnDeserializationThreadLocal; | |
import org.checkerframework.checker.initialization.qual.Initialized; | |
import org.checkerframework.checker.nullness.qual.NonNull; | |
import org.checkerframework.checker.nullness.qual.UnknownKeyFor; | |
import java.io.IOException; | |
import java.io.InputStream; | |
import java.io.OutputStream; | |
public final class SpecificRecordAvroCoder<T> extends CustomCoder<T> { | |
private static final EncoderFactory ENCODER_FACTORY = EncoderFactory.get(); | |
private static final DecoderFactory DECODER_FACTORY = DecoderFactory.get(); | |
private final EmptyOnDeserializationThreadLocal<BinaryDecoder> decoder; | |
private final EmptyOnDeserializationThreadLocal<BinaryEncoder> encoder; | |
private final EmptyOnDeserializationThreadLocal<DatumWriter<T>> writer; | |
private final EmptyOnDeserializationThreadLocal<DatumReader<T>> reader; | |
public SpecificRecordAvroCoder(Schema schema) { | |
this.decoder = new EmptyOnDeserializationThreadLocal<>(); | |
this.encoder = new EmptyOnDeserializationThreadLocal<>(); | |
this.reader = new EmptyOnDeserializationThreadLocal<>() { | |
@Override | |
public DatumReader<T> initialValue() { | |
return new SpecificDatumReader<>(schema); | |
} | |
}; | |
this.writer = new EmptyOnDeserializationThreadLocal<>() { | |
@Override | |
public DatumWriter<T> initialValue() { | |
return new SpecificDatumWriter<>(schema); | |
} | |
}; | |
} | |
@Override | |
public void encode(T value, @UnknownKeyFor @NonNull @Initialized OutputStream outStream) throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull @Initialized IOException { | |
var encoderInstance = ENCODER_FACTORY.directBinaryEncoder(outStream, encoder.get()); | |
encoder.set(encoderInstance); | |
writer.get().write(value, encoderInstance); | |
} | |
@Override | |
public T decode(@UnknownKeyFor @NonNull @Initialized InputStream inStream) throws @UnknownKeyFor @NonNull @Initialized CoderException, @UnknownKeyFor @NonNull @Initialized IOException { | |
var decoderInstance = DECODER_FACTORY.directBinaryDecoder(inStream, decoder.get()); | |
decoder.set(decoderInstance); | |
return reader.get().read(null, decoderInstance); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment