Skip to content

Instantly share code, notes, and snippets.

@j2jensen
Last active March 8, 2023 12:13
Show Gist options
  • Save j2jensen/becb85c4f1167079eef9427a5ad239e1 to your computer and use it in GitHub Desktop.
Save j2jensen/becb85c4f1167079eef9427a5ad239e1 to your computer and use it in GitHub Desktop.
Write avro file to a memory stream, then read it out again, using reflection.
// Write avro file to a memory stream, then read it out again, using reflection.
// Based on the very sparse C# docs from https://avro.apache.org/docs/1.11.1/api/csharp/html/md_src_apache_main_Reflect_README.html
// combined with the Java docs from https://avro.apache.org/docs/1.11.1/getting-started-java/
var baseClassSchema = @"
[
{ ""type"" : ""record"", ""name"" : ""Derived1"", ""fields"" :
[
{ ""name"" : ""A"", ""type"" : ""string""},
{ ""name"" : ""B"", ""type"" : ""int""}
]
},
{ ""type"" : ""record"", ""name"" : ""Derived2"", ""fields"" :
[
{ ""name"" : ""A"", ""type"" : ""string""},
{ ""name"" : ""C"", ""type"" : ""double""}
]
},
]
";
var schema = Schema.Parse(baseClassSchema);
var derived1write = new Derived1() { A = "derived1", B = 7 };
var derived2write = new Derived2() { A = "derived2", C = 3.14 };
// union types (except for [null, type]) need to be manually registered
var unionSchema = schema as UnionSchema;
var cache = new ClassCache();
cache.LoadClassCache(typeof(Derived1), unionSchema[0]);
cache.LoadClassCache(typeof(Derived2), unionSchema[1]);
var x = schema as RecordSchema;
var writer = new ReflectWriter<BaseClass>(schema, cache);
using (var stream = new MemoryStream(256))
{
using (var fileWriter = DataFileWriter<BaseClass>.OpenWriter(writer, stream, leaveOpen: true))
{
fileWriter.Append(derived1write);
fileWriter.Append(derived2write);
fileWriter.Flush();
}
stream.Position = 0;
foreach(var item in ReadAvroFileStreamUsingReflection<BaseClass>(stream))
{
Console.WriteLine(item);
}
}
IEnumerable<T> ReadAvroFileStreamUsingReflection<T>(Stream stream)
{
using (var fileReader = DataFileReader<T>.OpenReader(stream, null, (writerSchema, readerSchema) => new ReflectReader<T>(writerSchema, readerSchema, cache)))
{
while (fileReader.HasNext())
{
var nextItem = fileReader.Next();
yield return nextItem;
}
}
}
public class BaseClass
{
public string A { get; set; }
}
public class Derived1 : BaseClass
{
public int B { get; set; }
}
public class Derived2 : BaseClass
{
public double C { get; set; }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment