Skip to content

Instantly share code, notes, and snippets.

@udooz
Created August 27, 2019 14:36
Show Gist options
  • Save udooz/dfdea419c92be9cdcca1eb88d8a3fb39 to your computer and use it in GitHub Desktop.
Save udooz/dfdea419c92be9cdcca1eb88d8a3fb39 to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Generic;
using System.Reactive.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Confluent.Kafka;
namespace BananaEater
{
class Program
{
static void Main(string[] args)
{
var conf = new ConsumerConfig
{
GroupId = "test-consumer-group",
BootstrapServers = "localhost:9092",
// Note: The AutoOffsetReset property determines the start offset in the event
// there are not yet any committed offsets for the consumer group for the
// topic/partitions of interest. By default, offsets are committed
// automatically, so in this example, consumption will only start from the
// earliest message in the topic 'my-topic' the first time you run the program.
AutoOffsetReset = AutoOffsetReset.Earliest
};
Consume(conf)
.Subscribe(new ConsoleObserver<string>("Banana Eater"));
Console.ReadKey();
}
static IObservable<string> Consume(ConsumerConfig config)
{
return Observable.Create<string>((o, ct) =>
{
return Task.Run(() =>
{
using (var c = new ConsumerBuilder<Ignore, string>(config).Build())
{
c.Subscribe("test");
CancellationTokenSource cts = new CancellationTokenSource();
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cts.Cancel();
};
try
{
while (true)
{
try
{
var cr = c.Consume(cts.Token);
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'.");
o.OnNext(cr.Value);
}
catch (ConsumeException e)
{
o.OnError(e);
Console.WriteLine($"Error occured: {e.Error.Reason}");
}
}
}
catch (OperationCanceledException)
{
o.OnCompleted();
// Ensure the consumer leaves the group cleanly and final offsets are committed.
c.Close();
}
}
});
});
}
}
public class ConsoleObserver<T> : IObserver<T>
{
private readonly string _name;
public ConsoleObserver(string name = "")
{
_name = name;
}
public void OnNext(T value)
{
Console.WriteLine("{0} - OnNext({1})", _name, value);
}
public void OnError(Exception error)
{
Console.WriteLine("{0} - OnError:", _name);
Console.WriteLine("\t {0}", error);
}
public void OnCompleted()
{
Console.WriteLine("{0} - OnCompleted()", _name);
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment