Skip to content

Instantly share code, notes, and snippets.

Created February 22, 2021 07:15
Show Gist options
  • Save dbarkol/5e114375a29c3d9774ffdd637eb2e593 to your computer and use it in GitHub Desktop.
Save dbarkol/5e114375a29c3d9774ffdd637eb2e593 to your computer and use it in GitHub Desktop.
using Azure.Identity;
using Confluent.Kafka;
using Microsoft.Azure.Kafka.SchemaRegistry.Avro;
using System;
using System.Configuration;
using System.Threading;
namespace Zohan.SchemaRegistry.Consumer
class Program
static void Main(string[] args)
Console.WriteLine("Press any key to begin consuming new events");
static void ConsumeEvents()
// Initialize the producer configuration properties
var config = InitializeConsumerConfig();
// Create an instance of the serializer that will
// use the schema for the messages.
var valueDeserializer = InitializeValueDeserializer();
using (var consumer = new ConsumerBuilder<Null, CustomerLoyalty>(config)
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); };
// Retrieve the topic name from the configuration settings
// and set the subscription to the topic.
var topic = ConfigurationManager.AppSettings["EH_NAME"];
Console.WriteLine($"Consuming messages from topic: {topic}");
// Start consuming events
while (true)
var msg = consumer.Consume(cts.Token);
var loyalty = msg.Message.Value;
Console.WriteLine($"Customer {loyalty.CustomerId} received {loyalty.PointsAdded} points");
catch (ConsumeException e)
Console.WriteLine($"Consume error: {e.Error.Reason}");
catch (OperationCanceledException)
private static KafkaAvroDeserializer<CustomerLoyalty> InitializeValueDeserializer()
// Retrieve the necessary settings for the schema url and
// credentials needed communicate with the registry in Azure.
var schemaRegistryUrl = ConfigurationManager.AppSettings["SCHEMA_REGISTRY_URL"];
ClientSecretCredential credential = new ClientSecretCredential(
return new KafkaAvroDeserializer<CustomerLoyalty>(schemaRegistryUrl, credential);
private static ConsumerConfig InitializeConsumerConfig()
var brokerList = ConfigurationManager.AppSettings["EH_FQDN"];
var connectionString = ConfigurationManager.AppSettings["EH_CONNECTION_STRING"];
var caCertLocation = ConfigurationManager.AppSettings["CA_CERT_LOCATION"];
var consumerGroup = ConfigurationManager.AppSettings["CONSUMER_GROUP"];
return new ConsumerConfig
BootstrapServers = brokerList,
SecurityProtocol = SecurityProtocol.SaslSsl,
SocketTimeoutMs = 60000,
SessionTimeoutMs = 30000,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = "$ConnectionString",
SaslPassword = connectionString,
SslCaLocation = caCertLocation,
GroupId = consumerGroup,
AutoOffsetReset = AutoOffsetReset.Earliest,
BrokerVersionFallback = "1.0.0"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment