Skip to content

Instantly share code, notes, and snippets.

@promontis
Created October 24, 2019 13:35
Show Gist options
  • Save promontis/648a9f337706819767c5df7861fac0bc to your computer and use it in GitHub Desktop.
Save promontis/648a9f337706819767c5df7861fac0bc to your computer and use it in GitHub Desktop.
using Chr.Avro.Abstract;
using Chr.Avro.Confluent;
using Chr.Avro.Resolution;
using Confluent.Kafka;
using Confluent.SchemaRegistry;
using System;
using System.Diagnostics;
using System.Threading.Tasks;
namespace ConsoleApp1
{
public class Foo
{
public string Bar { get; set; }
public DateTime Date { get; set; }
}
class Program
{
static async Task Main(string[] args)
{
var schemaRegistryConfig = new SchemaRegistryConfig
{
SchemaRegistryUrl = "http://localhost:8081/"
};
var producerConfig = new ProducerConfig
{
BootstrapServers = "localhost"
};
var stopwatch = new Stopwatch();
stopwatch.Start();
Console.Out.WriteLine("Start CreateProducerForType");
var fooProducer = await CreateProducerForType<string, Foo>(producerConfig, schemaRegistryConfig);
Console.Out.WriteLine("Stop CreateProducerForType");
Console.Out.WriteLine($"CreateProducerForType took: {stopwatch.Elapsed}");
stopwatch.Restart();
Console.Out.WriteLine("Start CreateNullProducer");
var nullProducer = await CreateNullProducer<string>(fooProducer.Handle, schemaRegistryConfig);
Console.Out.WriteLine("Stop CreateNullProducer");
Console.Out.WriteLine($"CreateNullProducer took: {stopwatch.Elapsed}");
stopwatch.Stop();
await fooProducer.ProduceAsync("datetest4", new Message<string, Foo> { Key = "123", Value = new Foo { Bar = "123", Date = DateTime.Now } });
await nullProducer.ProduceAsync("datetest4", new Message<string, Null> { Key = "123", Value = null });
Console.ReadLine();
}
private static async Task<IProducer<TKey, TValue>> CreateProducerForType<TKey, TValue>(ProducerConfig producerConfig, SchemaRegistryConfig schemaRegistryConfig)
{
var builder = new ProducerBuilder<TKey, TValue>(producerConfig);
using (var registry = new CachedSchemaRegistryClient(schemaRegistryConfig))
{
await Task.WhenAll(
builder.SetAvroKeySerializer(
new SchemaRegistrySerializerBuilder(registry,
new SchemaBuilder(null, new DataContractResolver(resolveReferenceTypesAsNullable: true), TemporalBehavior.EpochMilliseconds)),
$"datetest4-avro-key", registerAutomatically: true),
builder.SetAvroValueSerializer(
new SchemaRegistrySerializerBuilder(registry,
new SchemaBuilder(null, new DataContractResolver(resolveReferenceTypesAsNullable: true), TemporalBehavior.EpochMilliseconds)),
$"datetest4-avro-value", registerAutomatically: true)
);
}
return builder.Build();
}
private static async Task<IProducer<TKey, Null>> CreateNullProducer<TKey>(Handle handle, SchemaRegistryConfig schemaRegistryConfig)
{
var builder = new DependentProducerBuilder<TKey, Null>(handle);
using (var registry = new CachedSchemaRegistryClient(schemaRegistryConfig))
{
await Task.WhenAll(
builder.SetAvroKeySerializer(new SchemaRegistrySerializerBuilder(registry,
new SchemaBuilder(null,
new DataContractResolver(resolveReferenceTypesAsNullable: true), TemporalBehavior.EpochMilliseconds)),
$"datetest4-avro-key", registerAutomatically: true)
);
}
return builder.Build();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment