Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
using Azure.Identity;
using Confluent.Kafka;
using Microsoft.Azure.Kafka.SchemaRegistry.Avro;
using System;
using System.Configuration;
using System.Threading;
using zohan.schemaregistry.events;
namespace Zohan.SchemaRegistry.Consumer
{
class Program
{
static void Main(string[] args)
{
Console.WriteLine("Press any key to begin consuming new events");
Console.ReadKey();
ConsumeEvents();
}
static void ConsumeEvents()
{
// Initialize the producer configuration properties
var config = InitializeConsumerConfig();
// Create an instance of the serializer that will
// use the schema for the messages.
var valueDeserializer = InitializeValueDeserializer();
using (var consumer = new ConsumerBuilder<Null, CustomerLoyalty>(config)
.SetValueDeserializer(valueDeserializer)
.Build())
{
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => { e.Cancel = true; cts.Cancel(); };
// Retrieve the topic name from the configuration settings
// and set the subscription to the topic.
var topic = ConfigurationManager.AppSettings["EH_NAME"];
consumer.Subscribe(topic);
Console.WriteLine($"Consuming messages from topic: {topic}");
// Start consuming events
try
{
while (true)
{
try
{
var msg = consumer.Consume(cts.Token);
var loyalty = msg.Message.Value;
Console.WriteLine($"Customer {loyalty.CustomerId} received {loyalty.PointsAdded} points");
}
catch (ConsumeException e)
{
Console.WriteLine($"Consume error: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
consumer.Close();
}
}
}
private static KafkaAvroDeserializer<CustomerLoyalty> InitializeValueDeserializer()
{
// Retrieve the necessary settings for the schema url and
// credentials needed communicate with the registry in Azure.
var schemaRegistryUrl = ConfigurationManager.AppSettings["SCHEMA_REGISTRY_URL"];
ClientSecretCredential credential = new ClientSecretCredential(
ConfigurationManager.AppSettings["SCHEMA_REGISTRY_TENANT_ID"],
ConfigurationManager.AppSettings["SCHEMA_REGISTRY_CLIENT_ID"],
ConfigurationManager.AppSettings["SCHEMA_REGISTRY_CLIENT_SECRET"]);
return new KafkaAvroDeserializer<CustomerLoyalty>(schemaRegistryUrl, credential);
}
private static ConsumerConfig InitializeConsumerConfig()
{
var brokerList = ConfigurationManager.AppSettings["EH_FQDN"];
var connectionString = ConfigurationManager.AppSettings["EH_CONNECTION_STRING"];
var caCertLocation = ConfigurationManager.AppSettings["CA_CERT_LOCATION"];
var consumerGroup = ConfigurationManager.AppSettings["CONSUMER_GROUP"];
return new ConsumerConfig
{
BootstrapServers = brokerList,
SecurityProtocol = SecurityProtocol.SaslSsl,
SocketTimeoutMs = 60000,
SessionTimeoutMs = 30000,
SaslMechanism = SaslMechanism.Plain,
SaslUsername = "$ConnectionString",
SaslPassword = connectionString,
SslCaLocation = caCertLocation,
GroupId = consumerGroup,
AutoOffsetReset = AutoOffsetReset.Earliest,
BrokerVersionFallback = "1.0.0"
};
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment