Created
August 27, 2019 14:36
-
-
Save udooz/dfdea419c92be9cdcca1eb88d8a3fb39 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Reactive.Linq; | |
using System.Text; | |
using System.Threading; | |
using System.Threading.Tasks; | |
using Confluent.Kafka; | |
namespace BananaEater | |
{ | |
class Program | |
{ | |
static void Main(string[] args) | |
{ | |
var conf = new ConsumerConfig | |
{ | |
GroupId = "test-consumer-group", | |
BootstrapServers = "localhost:9092", | |
// Note: The AutoOffsetReset property determines the start offset in the event | |
// there are not yet any committed offsets for the consumer group for the | |
// topic/partitions of interest. By default, offsets are committed | |
// automatically, so in this example, consumption will only start from the | |
// earliest message in the topic 'my-topic' the first time you run the program. | |
AutoOffsetReset = AutoOffsetReset.Earliest | |
}; | |
Consume(conf) | |
.Subscribe(new ConsoleObserver<string>("Banana Eater")); | |
Console.ReadKey(); | |
} | |
static IObservable<string> Consume(ConsumerConfig config) | |
{ | |
return Observable.Create<string>((o, ct) => | |
{ | |
return Task.Run(() => | |
{ | |
using (var c = new ConsumerBuilder<Ignore, string>(config).Build()) | |
{ | |
c.Subscribe("test"); | |
CancellationTokenSource cts = new CancellationTokenSource(); | |
Console.CancelKeyPress += (_, e) => { | |
e.Cancel = true; // prevent the process from terminating. | |
cts.Cancel(); | |
}; | |
try | |
{ | |
while (true) | |
{ | |
try | |
{ | |
var cr = c.Consume(cts.Token); | |
Console.WriteLine($"Consumed message '{cr.Value}' at: '{cr.TopicPartitionOffset}'."); | |
o.OnNext(cr.Value); | |
} | |
catch (ConsumeException e) | |
{ | |
o.OnError(e); | |
Console.WriteLine($"Error occured: {e.Error.Reason}"); | |
} | |
} | |
} | |
catch (OperationCanceledException) | |
{ | |
o.OnCompleted(); | |
// Ensure the consumer leaves the group cleanly and final offsets are committed. | |
c.Close(); | |
} | |
} | |
}); | |
}); | |
} | |
} | |
public class ConsoleObserver<T> : IObserver<T> | |
{ | |
private readonly string _name; | |
public ConsoleObserver(string name = "") | |
{ | |
_name = name; | |
} | |
public void OnNext(T value) | |
{ | |
Console.WriteLine("{0} - OnNext({1})", _name, value); | |
} | |
public void OnError(Exception error) | |
{ | |
Console.WriteLine("{0} - OnError:", _name); | |
Console.WriteLine("\t {0}", error); | |
} | |
public void OnCompleted() | |
{ | |
Console.WriteLine("{0} - OnCompleted()", _name); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment