-
-
Save clairemcginty/97ee6b33c0b5633d5d42d29b1d057d85 to your computer and use it in GitHub Desktop.
Avro serialization changes in Beam 2.30.0
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package avrotest | |
import org.apache.avro.Schema | |
import org.apache.avro.io.{DecoderFactory, EncoderFactory} | |
import org.apache.avro.reflect.{ReflectDatumReader, ReflectDatumWriter} | |
import org.apache.avro.specific.{SpecificData, SpecificDatumReader, SpecificDatumWriter} | |
import org.apache.beam.sdk.util.CoderUtils | |
import org.scalatest.flatspec.AnyFlatSpec | |
import org.scalatest.matchers.should.Matchers | |
import java.io.ByteArrayOutputStream | |
class AvroSerializationTests extends AnyFlatSpec with Matchers { | |
// Result: SUCCESS | |
"Avro SpecificRecords" should "decode String fields as java.lang.String using ReflectDatumReader" in { | |
val sr = UserSpecificRecord.newBuilder().setName("foo").build() | |
sr.getName.getClass shouldEqual classOf[String] | |
val writer = new ReflectDatumWriter[UserSpecificRecord](UserSpecificRecord.SCHEMA$) | |
val output = new ByteArrayOutputStream() | |
val encoder = EncoderFactory.get.binaryEncoder(output, null) | |
writer.write(sr, encoder) | |
encoder.flush() | |
val reader = new ReflectDatumReader[UserSpecificRecord](UserSpecificRecord.SCHEMA$) | |
val decoder = DecoderFactory.get.binaryDecoder(output.toByteArray, null) | |
val srRoundtripped = reader.read(null, decoder) | |
srRoundtripped.getName.getClass shouldEqual classOf[String] | |
} | |
// Result: FAILURE, return type is org.apache.avro.util.Utf8 | |
it should "decode String fields as java.lang.String using SpecificDatumReader" in { | |
val sr = UserSpecificRecord.newBuilder().setName("foo").build() | |
sr.getName.getClass shouldEqual classOf[String] | |
val writer = new SpecificDatumWriter[UserSpecificRecord](classOf[UserSpecificRecord]) | |
val output = new ByteArrayOutputStream() | |
val encoder = EncoderFactory.get.binaryEncoder(output, null) | |
writer.write(sr, encoder) | |
encoder.flush() | |
val reader = new SpecificDatumReader[UserSpecificRecord](classOf[UserSpecificRecord]) | |
val decoder = DecoderFactory.get.binaryDecoder(output.toByteArray, null) | |
val srRoundtripped = reader.read(null, decoder) | |
srRoundtripped.getName.getClass shouldEqual classOf[String] | |
} | |
// Result: SUCCESS | |
it should "decode String fields as java.lang.String using SpecificDatumReader with java-class property set" in { | |
val sr = UserSpecificRecord.newBuilder().setName("foo").build() | |
sr.getName.getClass shouldEqual classOf[String] | |
val schemaSer = sr.getSchema.toString // have to make a copy to modify schema | |
val schemaWithProp = Schema.parse(schemaSer) | |
schemaWithProp | |
.getField("name") | |
.schema() | |
.addProp(SpecificData.CLASS_PROP, "java.lang.String".asInstanceOf[Object]) | |
val writer = new SpecificDatumWriter[UserSpecificRecord](schemaWithProp) | |
val output = new ByteArrayOutputStream() | |
val encoder = EncoderFactory.get.binaryEncoder(output, null) | |
writer.write(sr, encoder) | |
encoder.flush() | |
val reader = new SpecificDatumReader[UserSpecificRecord](schemaWithProp) | |
val decoder = DecoderFactory.get.binaryDecoder(output.toByteArray, null) | |
val srRoundtripped = reader.read(null, decoder) | |
srRoundtripped.getName.getClass shouldEqual classOf[String] | |
} | |
// Results: | |
// - Beam 2.29.0: SUCCESS | |
// - Beam 2.30.0: FAILURE, return type is org.apache.avro.util.Utf8 | |
it should "decode String fields as java.lang.String using Beam Coder API" in { | |
val sr = UserSpecificRecord.newBuilder().setName("foo").build() | |
sr.getName.getClass shouldEqual classOf[String] | |
val coder = org.apache.beam.sdk.coders.AvroCoder.of(classOf[UserSpecificRecord]) | |
val encodedBytes = CoderUtils.encodeToByteArray(coder, sr) | |
val srRoundtripped = CoderUtils.decodeFromByteArray(coder, encodedBytes) | |
srRoundtripped.getName.getClass shouldEqual classOf[String] | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[ | |
{ | |
"name": "UserSpecificRecord", | |
"namespace": "avrotest", | |
"type": "record", | |
"fields": [ | |
{ | |
"name": "name", | |
"type": "string" | |
} | |
] | |
} | |
] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment