Skip to content

Instantly share code, notes, and snippets.

Created August 27, 2019 14:36
Show Gist options
  • Save udooz/dfdea419c92be9cdcca1eb88d8a3fb39 to your computer and use it in GitHub Desktop.
Save udooz/dfdea419c92be9cdcca1eb88d8a3fb39 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Generic;
using System.Reactive.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
namespace BananaEater
class Program
static void Main(string[] args)
var conf = new ConsumerConfig
GroupId = "test-consumer-group",
BootstrapServers = "localhost:9092",
// Note: The AutoOffsetReset property determines the start offset in the event
// there are not yet any committed offsets for the consumer group for the
// topic/partitions of interest. By default, offsets are committed
// automatically, so in this example, consumption will only start from the
// earliest message in the topic 'my-topic' the first time you run the program.
AutoOffsetReset = AutoOffsetReset.Earliest
.Subscribe(new ConsoleObserver<string>("Banana Eater"));
static IObservable<string> Consume(ConsumerConfig config)
return Observable.Create<string>((o, ct) =>
return Task.Run(() =>
using (var c = new ConsumerBuilder<Ignore, string>(config).Build())
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
while (true)
var cr = c.Consume(cts.Token);
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
catch (ConsumeException e)
Console.WriteLine($"Error occured: {e.Error.Reason}");
catch (OperationCanceledException)
// Ensure the consumer leaves the group cleanly and final offsets are committed.
public class ConsoleObserver<T> : IObserver<T>
private readonly string _name;
public ConsoleObserver(string name = "")
_name = name;
public void OnNext(T value)
Console.WriteLine("{0} - OnNext({1})", _name, value);
public void OnError(Exception error)
Console.WriteLine("{0} - OnError:", _name);
Console.WriteLine("\t {0}", error);
public void OnCompleted()
Console.WriteLine("{0} - OnCompleted()", _name);
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment