Skip to content

Instantly share code, notes, and snippets.

@teemka
Last active December 1, 2021 15:17
Show Gist options
  • Save teemka/4c77ed9cce130c9df4b92c436583d693 to your computer and use it in GitHub Desktop.
Save teemka/4c77ed9cce130c9df4b92c436583d693 to your computer and use it in GitHub Desktop.
Nullable Reference Types schema resolution
using Chr.Avro;
using Chr.Avro.Abstract;
using Chr.Avro.Resolution;
using Namotion.Reflection;
using System;
using System.Collections.Concurrent;
namespace Kafka.Producers
{
/// <summary>
/// This class is mostly a copy of <see cref="RecordSchemaBuilderCase"/>.
/// </summary>
public class CustomRecordSchemaBuilderCase : SchemaBuilderCase
{
public CustomRecordSchemaBuilderCase(ISchemaBuilder schemaBuilder)
{
this.SchemaBuilder = schemaBuilder;
}
public ISchemaBuilder SchemaBuilder { get; }
public override ISchemaBuildResult BuildSchema(
TypeResolution resolution,
ConcurrentDictionary<Type, Schema> cache)
{
var result = new SchemaBuildResult();
if (resolution is RecordResolution record)
{
if (cache.TryGetValue(record.Type, out var schema))
{
result.Schema = schema;
}
else
{
var name = record.Namespace == null
? record.Name.Value
: $"{record.Namespace.Value}.{record.Name.Value}";
var instance = new RecordSchema(name);
if (!cache.TryAdd(record.Type, instance))
{
throw new InvalidOperationException("Failed to cache record schema prior to building its fields.");
}
foreach (var field in record.Fields)
{
var fieldSchema = this.SchemaBuilder.BuildSchema(field.Type, cache);
// There is an API for this in .NET 6 - currently using Namotion.Reflection
if (field.Member.ToContextualAccessor().Nullability == Nullability.Nullable)
{
fieldSchema = new UnionSchema(new Schema[]
{
new NullSchema(),
fieldSchema,
});
}
var recordField = new RecordField(field.Name.Value, fieldSchema);
instance.Fields.Add(recordField);
}
result.Schema = instance;
}
}
else
{
result.Exceptions.Add(new UnsupportedTypeException(resolution.Type));
}
return result;
}
}
}
using Chr.Avro.Abstract;
using Chr.Avro.Resolution;
using System;
using System.Collections.Generic;
namespace Kafka.Producers
{
public class CustomSchemaBuilder : SchemaBuilder
{
public CustomSchemaBuilder(
TemporalBehavior temporalBehavior = TemporalBehavior.Iso8601,
ITypeResolver? typeResolver = null)
: base(CustomCreateCaseBuilders(temporalBehavior), typeResolver)
{
}
private static IEnumerable<Func<ISchemaBuilder, ISchemaBuilderCase>>
CustomCreateCaseBuilders(TemporalBehavior temporalBehavior)
{
return new Func<ISchemaBuilder, ISchemaBuilderCase>[]
{
builder => new ArraySchemaBuilderCase(builder),
builder => new BooleanSchemaBuilderCase(),
builder => new BytesSchemaBuilderCase(),
builder => new DecimalSchemaBuilderCase(),
builder => new DoubleSchemaBuilderCase(),
builder => new DurationSchemaBuilderCase(),
builder => new EnumSchemaBuilderCase(builder),
builder => new FloatSchemaBuilderCase(),
builder => new IntSchemaBuilderCase(),
builder => new LongSchemaBuilderCase(),
builder => new MapSchemaBuilderCase(builder),
builder => new StringSchemaBuilderCase(),
builder => new TimestampSchemaBuilderCase(temporalBehavior),
builder => new UriSchemaBuilderCase(),
builder => new UuidSchemaBuilderCase(),
// the change from the base:
builder => new CustomRecordSchemaBuilderCase(builder),
};
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment