Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
using System;
using System.Threading;
using Confluent.Kafka;
namespace KafkaConsumer
{
class Program
{
static void Main(string[] args)
{
var config = new ConsumerConfig
{
GroupId = "test-consumer-group",
BootstrapServers = "kafka-release.kube-kafka.svc.cluster.local:9092",
AutoOffsetReset = AutoOffsetReset.Earliest
};
using (var consumer = new ConsumerBuilder<Ignore, string>(config).Build())
{
consumer.Subscribe("increment-topic");
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) =>
{
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
while (true)
{
try
{
var consumeResult = consumer.Consume(cts.Token);
Console.WriteLine($"Message: {consumeResult.Message.Value}");
}
catch (Exception ex)
{
Console.WriteLine($"Error: {ex.Message}");
}
Thread.Sleep(100);
}
}
catch (OperationCanceledException)
{
consumer.Close();
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment