Skip to content

Instantly share code, notes, and snippets.

@lomholdt
Created August 18, 2021 04:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lomholdt/b3f3b448225543ddac7826dc1da71f69 to your computer and use it in GitHub Desktop.
Save lomholdt/b3f3b448225543ddac7826dc1da71f69 to your computer and use it in GitHub Desktop.
namespace DaprKafkaDemo.Shared;
public class ConfluentKafkaOptions
{
public const string ConfluentKafka = "ConfluentKafka";
public SchemaRegistry SchemaRegistry { get; set; } = new SchemaRegistry();
}
public class SchemaRegistry
{
public string Url { get; set; } = string.Empty;
public string Username { get; set; } = string.Empty;
public string Password { get; set; } = string.Empty;
public string BasicAuthUserInfo => $"{Username}:{Password}";
}
using Avro.Specific;
namespace DaprKafkaDemo.Shared;
public record ConfluentAvroEvent<TEvent>(TEvent Value)
where TEvent : ISpecificRecord;
using Avro.Specific;
using Confluent.SchemaRegistry;
using Confluent.SchemaRegistry.Serdes;
using Microsoft.AspNetCore.Mvc.ModelBinding;
using Microsoft.AspNetCore.Mvc.ModelBinding.Binders;
namespace DaprKafkaDemo.Shared;
public class ConfluentEventBinder<TEvent> : IModelBinder where TEvent : ISpecificRecord
{
private readonly ISchemaRegistryClient _registryClient;
public ConfluentEventBinder(ISchemaRegistryClient registryClient)
{
_registryClient = registryClient ?? throw new ArgumentNullException(nameof(registryClient));
}
public async Task BindModelAsync(ModelBindingContext bindingContext)
{
if (bindingContext is null)
{
throw new ArgumentNullException(nameof(bindingContext));
}
var body = bindingContext.ActionContext.HttpContext.Request.Body;
var memoryStream = new MemoryStream();
await body.CopyToAsync(memoryStream);
var byteArr = memoryStream.ToArray();
var deserializer = new AvroDeserializer<TEvent>(_registryClient);
var @event = await deserializer.DeserializeAsync(byteArr, default, new());
var confluentEvent = new ConfluentAvroEvent<TEvent>(@event);
bindingContext.Result = ModelBindingResult.Success(confluentEvent);
}
}
public class ConfluentEventBinderProvider : IModelBinderProvider
{
public IModelBinder? GetBinder(ModelBinderProviderContext context)
{
if (context is null)
{
throw new ArgumentNullException(nameof(context));
}
if (context.Metadata.ModelType.GetGenericTypeDefinition() == typeof(ConfluentAvroEvent<>))
{
Type[] types = context.Metadata.ModelType.GetGenericArguments();
Type binderType = typeof(ConfluentEventBinder<>).MakeGenericType(types);
return new BinderTypeModelBinder(binderType);
}
return null;
}
}
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net6.0</TargetFramework>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<PackageReference Include="Confluent.Kafka" Version="1.7.0" />
<PackageReference Include="Confluent.SchemaRegistry" Version="1.7.0" />
<PackageReference Include="Confluent.SchemaRegistry.Serdes.Avro" Version="1.7.0" />
<PackageReference Include="Microsoft.AspNetCore.Mvc" Version="2.0.2" />
<PackageReference Include="Microsoft.AspNetCore.Mvc.Core" Version="2.0.2" />
</ItemGroup>
</Project>
@lomholdt
Copy link
Author

lomholdt commented Aug 18, 2021

// Startup.ConfigureServices
services.AddControllers(options =>
{
    options.ModelBinderProviders.Insert(0, new ConfluentEventBinderProvider());
});

var confluentKafkaOptions = Configuration.GetSection(ConfluentKafkaOptions.ConfluentKafka);
services.Configure<ConfluentKafkaOptions>(confluentKafkaOptions);

services.AddSingleton<ISchemaRegistryClient, CachedSchemaRegistryClient>(cl =>
{
    var options = confluentKafkaOptions.Get<ConfluentKafkaOptions>();

    var schemaRegistryConfig = new SchemaRegistryConfig
    {
        Url = options.SchemaRegistry.Url,
        BasicAuthUserInfo = options.SchemaRegistry.BasicAuthUserInfo
    };

    return new CachedSchemaRegistryClient(schemaRegistryConfig);
});

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment