Last active
December 11, 2015 16:19
-
-
Save gwu/4626429 to your computer and use it in GitHub Desktop.
Reading and writing unions with Avro
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.wibidata.avro; | |
import java.io.ByteArrayOutputStream; | |
import java.io.IOException; | |
import java.util.Arrays; | |
import org.apache.avro.AvroRuntimeException; | |
import org.apache.avro.Schema; | |
import org.apache.avro.generic.GenericData; | |
import org.apache.avro.generic.GenericDatumReader; | |
import org.apache.avro.generic.GenericDatumWriter; | |
import org.apache.avro.generic.GenericRecord; | |
import org.apache.avro.io.DatumReader; | |
import org.apache.avro.io.DatumWriter; | |
import org.apache.avro.io.Decoder; | |
import org.apache.avro.io.DecoderFactory; | |
import org.apache.avro.io.Encoder; | |
import org.apache.avro.io.EncoderFactory; | |
import org.junit.Assert; | |
import org.junit.Test; | |
/** | |
* Tests related to a schema of ["null", <i>datum</i>]. In particular, | |
* we want to check that the schema resolution rules for unions match | |
* the specification at http://avro.apache.org/docs/current/spec.html#Schema+Resolution | |
* Here's a snippet from that page: | |
* | |
* <b>It is an error if the two schemas do not match.</b> | |
* | |
* To match, one of the following must hold: | |
* <ul> | |
* <li>both schemas are arrays whose item types match</li> | |
* <li>both schemas are maps whose value types match</li> | |
* <li>both schemas are enums whose names match</li> | |
* <li>both schemas are fixed whose sizes and names match</li> | |
* <li>both schemas are records with the same name</li> | |
* <li>either schema is a union</li> | |
* <li>both schemas have same primitive type</li> | |
* <li>the writer's schema may be promoted to the reader's as follows: | |
* <ul> | |
* <li>int is promotable to long, float, or double</li> | |
* <li>long is promotable to float or double</li> | |
* <li>float is promotable to double</li> | |
* </ul> | |
* </li> | |
* </ul> | |
* | |
* Additionally: | |
* <ul> | |
* <li><b>if reader's is a union, but writer's is not:</b> | |
* The first schema in the reader's union that matches the writer's | |
* schema is recursively resolved against it. If none match, an error | |
* is signalled.</li> | |
* <li><b>if writer's is a union, but reader's is not:</b> | |
* If the reader's schema matches the selected writer's schema, it | |
* is recursively resolved against it. If they do not match, an error | |
* is signalled.</li> | |
* </ul> | |
* | |
* This suite therefore contains a series of tests that verify schema | |
* resolution works as claimed in the spec (when the reader is a union | |
* but the writer is not, and vice versa). | |
*/ | |
public class AvroNullableDatumTest { | |
/** | |
* Writer schema: "int" | |
* Raeder schema: ["null", "int"] | |
*/ | |
@Test | |
public void testWriteIntReaderUnion() throws Exception { | |
final Schema intSchema = Schema.create(Schema.Type.INT); | |
final Schema nullSchema = Schema.create(Schema.Type.NULL); | |
final Schema unionSchema = Schema.createUnion(Arrays.asList(nullSchema, intSchema)); | |
final Integer inValue = Integer.valueOf(42); | |
final byte[] serializedData = encodeBinaryAvro(intSchema, inValue); | |
final Integer outValue = decodeBinaryAvro(intSchema, unionSchema, serializedData); | |
Assert.assertEquals(inValue, outValue); | |
} | |
/** | |
* Writer schema: "int" | |
* Raeder schema: ["null", "int"] | |
*/ | |
@Test | |
public void testWriteUnionReadInt() throws Exception { | |
final Schema intSchema = Schema.create(Schema.Type.INT); | |
final Schema nullSchema = Schema.create(Schema.Type.NULL); | |
final Schema unionSchema = Schema.createUnion(Arrays.asList(nullSchema, intSchema)); | |
final Integer inValue = Integer.valueOf(42); | |
final byte[] serializedData = encodeBinaryAvro(unionSchema, inValue); | |
final Integer outValue = decodeBinaryAvro(unionSchema, intSchema, serializedData); | |
Assert.assertEquals(inValue, outValue); | |
} | |
/** | |
* Writer Schema: <i>GenericRecord</i> | |
* Reader Schema: ["null", <i>GenericRecord</i>] | |
*/ | |
@Test | |
public void testWriteRecordReadUnion() throws Exception { | |
final Schema nullSchema = Schema.create(Schema.Type.NULL); | |
final Schema.Field recordField1 = new Schema.Field( | |
"one", Schema.create(Schema.Type.INT), null, null); | |
final Schema.Field recordField2 = new Schema.Field( | |
"two", Schema.create(Schema.Type.STRING), null, null); | |
final Schema genericRecordSchema = Schema.createRecord("record", null, null, false); | |
genericRecordSchema.setFields( | |
Arrays.asList(recordField1, recordField2)); | |
final Schema nullableRecordSchema = Schema.createUnion( | |
Arrays.asList(nullSchema, genericRecordSchema)); | |
final GenericRecord inRecord = new GenericData.Record(genericRecordSchema); | |
inRecord.put("one", 1); | |
inRecord.put("two", "two"); | |
final byte[] serializedData = encodeBinaryAvro(genericRecordSchema, inRecord); | |
final GenericRecord outRecord = decodeBinaryAvro( | |
genericRecordSchema, nullableRecordSchema, serializedData); | |
Assert.assertEquals(inRecord.get("one"), outRecord.get("one")); | |
Assert.assertEquals(inRecord.get("two").toString(), outRecord.get("two").toString()); | |
} | |
/** | |
* Writer Schema: ["null", <i>GenericRecord</i>] | |
* Reader Schema: <i>GenericRecord</i> | |
*/ | |
@Test | |
public void testWriteUnionReadRecord() throws Exception { | |
final Schema nullSchema = Schema.create(Schema.Type.NULL); | |
final Schema.Field recordField1 = new Schema.Field( | |
"one", Schema.create(Schema.Type.INT), null, null); | |
final Schema.Field recordField2 = new Schema.Field( | |
"two", Schema.create(Schema.Type.STRING), null, null); | |
final Schema genericRecordSchema = Schema.createRecord("record", null, null, false); | |
genericRecordSchema.setFields( | |
Arrays.asList(recordField1, recordField2)); | |
final Schema nullableRecordSchema = Schema.createUnion( | |
Arrays.asList(nullSchema, genericRecordSchema)); | |
final GenericRecord inRecord = new GenericData.Record(genericRecordSchema); | |
inRecord.put("one", 1); | |
inRecord.put("two", "two"); | |
final byte[] serializedData = encodeBinaryAvro(nullableRecordSchema, inRecord); | |
final GenericRecord outRecord = decodeBinaryAvro( | |
nullableRecordSchema, genericRecordSchema, serializedData); | |
Assert.assertEquals(inRecord.get("one"), outRecord.get("one")); | |
Assert.assertEquals(inRecord.get("two").toString(), outRecord.get("two").toString()); | |
} | |
/** | |
* Binary-encodes an avro datum into a byte array. | |
*/ | |
private byte[] encodeBinaryAvro(Schema writerSchema, Object avro) throws IOException { | |
final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); | |
try { | |
final EncoderFactory encoderFactory = EncoderFactory.get(); | |
final Encoder encoder = encoderFactory.binaryEncoder(byteArrayOutputStream, null); | |
final DatumWriter<Object> writer = new GenericDatumWriter<Object>(writerSchema); | |
writer.write(avro, encoder); | |
encoder.flush(); | |
return byteArrayOutputStream.toByteArray(); | |
} finally { | |
byteArrayOutputStream.close(); | |
} | |
} | |
/** | |
* Binary-decodes the contents of a byte array into an avro datum. | |
*/ | |
private <T> T decodeBinaryAvro( | |
Schema writerSchema, Schema readerSchema, byte[] binaryEncodedAvro) | |
throws IOException { | |
final DecoderFactory decoderFactory = DecoderFactory.get(); | |
final Decoder decoder = decoderFactory.binaryDecoder(binaryEncodedAvro, null); | |
final DatumReader<T> reader = new GenericDatumReader<T>(writerSchema, readerSchema); | |
return reader.read(null, decoder); | |
} | |
} |
Ah. I forgot to pass the writer schema in the constructor of the decoder. I'll fix this and hopefully the world will be normal again.
Updated. All better.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Here are test results. They all fail!