Skip to content

Instantly share code, notes, and snippets.

@clairemcginty
Last active July 16, 2021 14:55
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save clairemcginty/97ee6b33c0b5633d5d42d29b1d057d85 to your computer and use it in GitHub Desktop.
Save clairemcginty/97ee6b33c0b5633d5d42d29b1d057d85 to your computer and use it in GitHub Desktop.
Avro serialization changes in Beam 2.30.0
package avrotest
import org.apache.avro.Schema
import org.apache.avro.io.{DecoderFactory, EncoderFactory}
import org.apache.avro.reflect.{ReflectDatumReader, ReflectDatumWriter}
import org.apache.avro.specific.{SpecificData, SpecificDatumReader, SpecificDatumWriter}
import org.apache.beam.sdk.util.CoderUtils
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import java.io.ByteArrayOutputStream
class AvroSerializationTests extends AnyFlatSpec with Matchers {
// Result: SUCCESS
"Avro SpecificRecords" should "decode String fields as java.lang.String using ReflectDatumReader" in {
val sr = UserSpecificRecord.newBuilder().setName("foo").build()
sr.getName.getClass shouldEqual classOf[String]
val writer = new ReflectDatumWriter[UserSpecificRecord](UserSpecificRecord.SCHEMA$)
val output = new ByteArrayOutputStream()
val encoder = EncoderFactory.get.binaryEncoder(output, null)
writer.write(sr, encoder)
encoder.flush()
val reader = new ReflectDatumReader[UserSpecificRecord](UserSpecificRecord.SCHEMA$)
val decoder = DecoderFactory.get.binaryDecoder(output.toByteArray, null)
val srRoundtripped = reader.read(null, decoder)
srRoundtripped.getName.getClass shouldEqual classOf[String]
}
// Result: FAILURE, return type is org.apache.avro.util.Utf8
it should "decode String fields as java.lang.String using SpecificDatumReader" in {
val sr = UserSpecificRecord.newBuilder().setName("foo").build()
sr.getName.getClass shouldEqual classOf[String]
val writer = new SpecificDatumWriter[UserSpecificRecord](classOf[UserSpecificRecord])
val output = new ByteArrayOutputStream()
val encoder = EncoderFactory.get.binaryEncoder(output, null)
writer.write(sr, encoder)
encoder.flush()
val reader = new SpecificDatumReader[UserSpecificRecord](classOf[UserSpecificRecord])
val decoder = DecoderFactory.get.binaryDecoder(output.toByteArray, null)
val srRoundtripped = reader.read(null, decoder)
srRoundtripped.getName.getClass shouldEqual classOf[String]
}
// Result: SUCCESS
it should "decode String fields as java.lang.String using SpecificDatumReader with java-class property set" in {
val sr = UserSpecificRecord.newBuilder().setName("foo").build()
sr.getName.getClass shouldEqual classOf[String]
val schemaSer = sr.getSchema.toString // have to make a copy to modify schema
val schemaWithProp = Schema.parse(schemaSer)
schemaWithProp
.getField("name")
.schema()
.addProp(SpecificData.CLASS_PROP, "java.lang.String".asInstanceOf[Object])
val writer = new SpecificDatumWriter[UserSpecificRecord](schemaWithProp)
val output = new ByteArrayOutputStream()
val encoder = EncoderFactory.get.binaryEncoder(output, null)
writer.write(sr, encoder)
encoder.flush()
val reader = new SpecificDatumReader[UserSpecificRecord](schemaWithProp)
val decoder = DecoderFactory.get.binaryDecoder(output.toByteArray, null)
val srRoundtripped = reader.read(null, decoder)
srRoundtripped.getName.getClass shouldEqual classOf[String]
}
// Results:
// - Beam 2.29.0: SUCCESS
// - Beam 2.30.0: FAILURE, return type is org.apache.avro.util.Utf8
it should "decode String fields as java.lang.String using Beam Coder API" in {
val sr = UserSpecificRecord.newBuilder().setName("foo").build()
sr.getName.getClass shouldEqual classOf[String]
val coder = org.apache.beam.sdk.coders.AvroCoder.of(classOf[UserSpecificRecord])
val encodedBytes = CoderUtils.encodeToByteArray(coder, sr)
val srRoundtripped = CoderUtils.decodeFromByteArray(coder, encodedBytes)
srRoundtripped.getName.getClass shouldEqual classOf[String]
}
}
[
{
"name": "UserSpecificRecord",
"namespace": "avrotest",
"type": "record",
"fields": [
{
"name": "name",
"type": "string"
}
]
}
]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment