Skip to content

Instantly share code, notes, and snippets.

@gwu
Last active December 11, 2015 16:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gwu/4626429 to your computer and use it in GitHub Desktop.
Save gwu/4626429 to your computer and use it in GitHub Desktop.
Reading and writing unions with Avro
package com.wibidata.avro;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.util.Arrays;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.DatumReader;
import org.apache.avro.io.DatumWriter;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.junit.Assert;
import org.junit.Test;
/**
* Tests related to a schema of ["null", <i>datum</i>]. In particular,
* we want to check that the schema resolution rules for unions match
* the specification at http://avro.apache.org/docs/current/spec.html#Schema+Resolution
* Here's a snippet from that page:
*
* <b>It is an error if the two schemas do not match.</b>
*
* To match, one of the following must hold:
* <ul>
* <li>both schemas are arrays whose item types match</li>
* <li>both schemas are maps whose value types match</li>
* <li>both schemas are enums whose names match</li>
* <li>both schemas are fixed whose sizes and names match</li>
* <li>both schemas are records with the same name</li>
* <li>either schema is a union</li>
* <li>both schemas have same primitive type</li>
* <li>the writer's schema may be promoted to the reader's as follows:
* <ul>
* <li>int is promotable to long, float, or double</li>
* <li>long is promotable to float or double</li>
* <li>float is promotable to double</li>
* </ul>
* </li>
* </ul>
*
* Additionally:
* <ul>
* <li><b>if reader's is a union, but writer's is not:</b>
* The first schema in the reader's union that matches the writer's
* schema is recursively resolved against it. If none match, an error
* is signalled.</li>
* <li><b>if writer's is a union, but reader's is not:</b>
* If the reader's schema matches the selected writer's schema, it
* is recursively resolved against it. If they do not match, an error
* is signalled.</li>
* </ul>
*
* This suite therefore contains a series of tests that verify schema
* resolution works as claimed in the spec (when the reader is a union
* but the writer is not, and vice versa).
*/
public class AvroNullableDatumTest {
/**
* Writer schema: "int"
* Raeder schema: ["null", "int"]
*/
@Test
public void testWriteIntReaderUnion() throws Exception {
final Schema intSchema = Schema.create(Schema.Type.INT);
final Schema nullSchema = Schema.create(Schema.Type.NULL);
final Schema unionSchema = Schema.createUnion(Arrays.asList(nullSchema, intSchema));
final Integer inValue = Integer.valueOf(42);
final byte[] serializedData = encodeBinaryAvro(intSchema, inValue);
final Integer outValue = decodeBinaryAvro(intSchema, unionSchema, serializedData);
Assert.assertEquals(inValue, outValue);
}
/**
* Writer schema: "int"
* Raeder schema: ["null", "int"]
*/
@Test
public void testWriteUnionReadInt() throws Exception {
final Schema intSchema = Schema.create(Schema.Type.INT);
final Schema nullSchema = Schema.create(Schema.Type.NULL);
final Schema unionSchema = Schema.createUnion(Arrays.asList(nullSchema, intSchema));
final Integer inValue = Integer.valueOf(42);
final byte[] serializedData = encodeBinaryAvro(unionSchema, inValue);
final Integer outValue = decodeBinaryAvro(unionSchema, intSchema, serializedData);
Assert.assertEquals(inValue, outValue);
}
/**
* Writer Schema: <i>GenericRecord</i>
* Reader Schema: ["null", <i>GenericRecord</i>]
*/
@Test
public void testWriteRecordReadUnion() throws Exception {
final Schema nullSchema = Schema.create(Schema.Type.NULL);
final Schema.Field recordField1 = new Schema.Field(
"one", Schema.create(Schema.Type.INT), null, null);
final Schema.Field recordField2 = new Schema.Field(
"two", Schema.create(Schema.Type.STRING), null, null);
final Schema genericRecordSchema = Schema.createRecord("record", null, null, false);
genericRecordSchema.setFields(
Arrays.asList(recordField1, recordField2));
final Schema nullableRecordSchema = Schema.createUnion(
Arrays.asList(nullSchema, genericRecordSchema));
final GenericRecord inRecord = new GenericData.Record(genericRecordSchema);
inRecord.put("one", 1);
inRecord.put("two", "two");
final byte[] serializedData = encodeBinaryAvro(genericRecordSchema, inRecord);
final GenericRecord outRecord = decodeBinaryAvro(
genericRecordSchema, nullableRecordSchema, serializedData);
Assert.assertEquals(inRecord.get("one"), outRecord.get("one"));
Assert.assertEquals(inRecord.get("two").toString(), outRecord.get("two").toString());
}
/**
* Writer Schema: ["null", <i>GenericRecord</i>]
* Reader Schema: <i>GenericRecord</i>
*/
@Test
public void testWriteUnionReadRecord() throws Exception {
final Schema nullSchema = Schema.create(Schema.Type.NULL);
final Schema.Field recordField1 = new Schema.Field(
"one", Schema.create(Schema.Type.INT), null, null);
final Schema.Field recordField2 = new Schema.Field(
"two", Schema.create(Schema.Type.STRING), null, null);
final Schema genericRecordSchema = Schema.createRecord("record", null, null, false);
genericRecordSchema.setFields(
Arrays.asList(recordField1, recordField2));
final Schema nullableRecordSchema = Schema.createUnion(
Arrays.asList(nullSchema, genericRecordSchema));
final GenericRecord inRecord = new GenericData.Record(genericRecordSchema);
inRecord.put("one", 1);
inRecord.put("two", "two");
final byte[] serializedData = encodeBinaryAvro(nullableRecordSchema, inRecord);
final GenericRecord outRecord = decodeBinaryAvro(
nullableRecordSchema, genericRecordSchema, serializedData);
Assert.assertEquals(inRecord.get("one"), outRecord.get("one"));
Assert.assertEquals(inRecord.get("two").toString(), outRecord.get("two").toString());
}
/**
* Binary-encodes an avro datum into a byte array.
*/
private byte[] encodeBinaryAvro(Schema writerSchema, Object avro) throws IOException {
final ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
try {
final EncoderFactory encoderFactory = EncoderFactory.get();
final Encoder encoder = encoderFactory.binaryEncoder(byteArrayOutputStream, null);
final DatumWriter<Object> writer = new GenericDatumWriter<Object>(writerSchema);
writer.write(avro, encoder);
encoder.flush();
return byteArrayOutputStream.toByteArray();
} finally {
byteArrayOutputStream.close();
}
}
/**
* Binary-decodes the contents of a byte array into an avro datum.
*/
private <T> T decodeBinaryAvro(
Schema writerSchema, Schema readerSchema, byte[] binaryEncodedAvro)
throws IOException {
final DecoderFactory decoderFactory = DecoderFactory.get();
final Decoder decoder = decoderFactory.binaryDecoder(binaryEncodedAvro, null);
final DatumReader<T> reader = new GenericDatumReader<T>(writerSchema, readerSchema);
return reader.read(null, decoder);
}
}
@gwu
Copy link
Author

gwu commented Jan 24, 2013

Here are test results. They all fail!

-------------------------------------------------------------------------------
Test set: com.wibidata.avro.AvroNullableDatumTest
-------------------------------------------------------------------------------
Tests run: 4, Failures: 2, Errors: 2, Skipped: 0, Time elapsed: 0.208 sec <<< FAILURE!
testWriteUnionReadInt(com.wibidata.avro.AvroNullableDatumTest)  Time elapsed: 0.159 sec  <<< FAILURE!
java.lang.AssertionError: expected:<42> but was:<1>
    at org.junit.Assert.fail(Assert.java:88)
    at org.junit.Assert.failNotEquals(Assert.java:743)
    at org.junit.Assert.assertEquals(Assert.java:118)
    at org.junit.Assert.assertEquals(Assert.java:144)
    at com.wibidata.avro.AvroNullableDatumTest.testWriteUnionReadInt(AvroNullableDatumTest.java:94)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
    at org.apache.maven.surefire.junit4.JUnit4TestSet.execute(JUnit4TestSet.java:53)
    at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:123)
    at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:104)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:164)
    at org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:110)
    at org.apache.maven.surefire.booter.SurefireStarter.invokeProvider(SurefireStarter.java:175)
    at org.apache.maven.surefire.booter.SurefireStarter.runSuitesInProcessWhenForked(SurefireStarter.java:107)
    at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:68)
testWriteIntReaderUnion(com.wibidata.avro.AvroNullableDatumTest)  Time elapsed: 0.001 sec  <<< ERROR!
java.lang.ArrayIndexOutOfBoundsException: 42
    at org.apache.avro.io.parsing.Symbol$Alternative.getSymbol(Symbol.java:364)
    at org.apache.avro.io.ResolvingDecoder.doAction(ResolvingDecoder.java:229)
    at org.apache.avro.io.parsing.Parser.advance(Parser.java:88)
    at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:206)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:148)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:135)
    at com.wibidata.avro.AvroNullableDatumTest.decodeBinaryAvro(AvroNullableDatumTest.java:176)
    at com.wibidata.avro.AvroNullableDatumTest.testWriteIntReaderUnion(AvroNullableDatumTest.java:77)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
    at org.apache.maven.surefire.junit4.JUnit4TestSet.execute(JUnit4TestSet.java:53)
    at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:123)
    at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:104)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:164)
    at org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:110)
    at org.apache.maven.surefire.booter.SurefireStarter.invokeProvider(SurefireStarter.java:175)
    at org.apache.maven.surefire.booter.SurefireStarter.runSuitesInProcessWhenForked(SurefireStarter.java:107)
    at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:68)
testWriteUnionReadRecord(com.wibidata.avro.AvroNullableDatumTest)  Time elapsed: 0.003 sec  <<< FAILURE!
org.junit.ComparisonFailure: expected:<[two]> but was:<[�]>
    at org.junit.Assert.assertEquals(Assert.java:115)
    at org.junit.Assert.assertEquals(Assert.java:144)
    at com.wibidata.avro.AvroNullableDatumTest.testWriteUnionReadRecord(AvroNullableDatumTest.java:148)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
    at org.apache.maven.surefire.junit4.JUnit4TestSet.execute(JUnit4TestSet.java:53)
    at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:123)
    at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:104)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:164)
    at org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:110)
    at org.apache.maven.surefire.booter.SurefireStarter.invokeProvider(SurefireStarter.java:175)
    at org.apache.maven.surefire.booter.SurefireStarter.runSuitesInProcessWhenForked(SurefireStarter.java:107)
    at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:68)
testWriteRecordReadUnion(com.wibidata.avro.AvroNullableDatumTest)  Time elapsed: 0.002 sec  <<< ERROR!
java.io.EOFException
    at org.apache.avro.io.BinaryDecoder$ByteArrayByteSource.readRaw(BinaryDecoder.java:940)
    at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:345)
    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:262)
    at org.apache.avro.io.ValidatingDecoder.readString(ValidatingDecoder.java:107)
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:344)
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:337)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:150)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:173)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:148)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:135)
    at com.wibidata.avro.AvroNullableDatumTest.decodeBinaryAvro(AvroNullableDatumTest.java:176)
    at com.wibidata.avro.AvroNullableDatumTest.testWriteRecordReadUnion(AvroNullableDatumTest.java:119)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:47)
    at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
    at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:44)
    at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
    at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:271)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:70)
    at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:50)
    at org.junit.runners.ParentRunner$3.run(ParentRunner.java:238)
    at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:63)
    at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:236)
    at org.junit.runners.ParentRunner.access$000(ParentRunner.java:53)
    at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:229)
    at org.junit.runners.ParentRunner.run(ParentRunner.java:309)
    at org.apache.maven.surefire.junit4.JUnit4TestSet.execute(JUnit4TestSet.java:53)
    at org.apache.maven.surefire.junit4.JUnit4Provider.executeTestSet(JUnit4Provider.java:123)
    at org.apache.maven.surefire.junit4.JUnit4Provider.invoke(JUnit4Provider.java:104)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
    at java.lang.reflect.Method.invoke(Method.java:597)
    at org.apache.maven.surefire.util.ReflectionUtils.invokeMethodWithArray(ReflectionUtils.java:164)
    at org.apache.maven.surefire.booter.ProviderFactory$ProviderProxy.invoke(ProviderFactory.java:110)
    at org.apache.maven.surefire.booter.SurefireStarter.invokeProvider(SurefireStarter.java:175)
    at org.apache.maven.surefire.booter.SurefireStarter.runSuitesInProcessWhenForked(SurefireStarter.java:107)
    at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:68)

@gwu
Copy link
Author

gwu commented Jan 24, 2013

Ah. I forgot to pass the writer schema in the constructor of the decoder. I'll fix this and hopefully the world will be normal again.

@gwu
Copy link
Author

gwu commented Jan 24, 2013

Updated. All better.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment