Skip to content

Instantly share code, notes, and snippets.

@mhowlett
Created March 21, 2018 17:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mhowlett/2cf6adeeb43f222a1d90e1e486dd00a1 to your computer and use it in GitHub Desktop.
Save mhowlett/2cf6adeeb43f222a1d90e1e486dd00a1 to your computer and use it in GitHub Desktop.
using System;
using System.Text;
using System.Collections.Generic;
using Confluent.Kafka;
using Confluent.Kafka.Serialization;
namespace Issue470
{
class Program
{
public static void Run_PollWithManualCommit(string brokerList, List<string> topics)
{
using (var consumer = new Consumer<Ignore, string>(constructConfig(brokerList, false), null, new StringDeserializer(Encoding.UTF8)))
{
// Note: All event handlers are called on the main thread.
consumer.OnMessage += (_, msg)
=>
{
Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {msg.Value}");
//Console.WriteLine($"Committing offset");
//var committedOffsets = consumer.CommitAsync(msg).Result;
//Console.WriteLine($"Committed offset: [{string.Join(", ", committedOffsets.Offsets)}]");
};
consumer.OnPartitionEOF += (_, end)
=> Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}, next message will be at offset {end.Offset}");
// Raised on critical errors, e.g. connection failures or all brokers down.
consumer.OnError += (_, error)
=> Console.WriteLine($"Error: {error}");
// Raised on deserialization errors or when a consumed message has an error != NoError.
consumer.OnConsumeError += (_, msg)
=> Console.WriteLine($"Error consuming from topic/partition/offset {msg.Topic}/{msg.Partition}/{msg.Offset}: {msg.Error}");
//this is NOT called, when autocommit is disabled
consumer.OnOffsetsCommitted += (_, commit) =>
{
Console.WriteLine($"[{string.Join(", ", commit.Offsets)}]");
if (commit.Error)
{
Console.WriteLine($"Failed to commit offsets: {commit.Error}");
}
Console.WriteLine($"Successfully committed offsets: [{string.Join(", ", commit.Offsets)}]");
};
consumer.OnPartitionsAssigned += (_, partitions) =>
{
Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}], member id: {consumer.MemberId}");
consumer.Assign(partitions);
};
consumer.OnPartitionsRevoked += (_, partitions) =>
{
Console.WriteLine($"Revoked partitions: [{string.Join(", ", partitions)}]");
consumer.Unassign();
};
//consumer.OnStatistics += (_, json)
// => Console.WriteLine($"Statistics: {json}");
//The subscribe() method controls which topics will be fetched in poll.
consumer.Subscribe(topics);
Console.WriteLine($"Subscribed to: [{string.Join(", ", consumer.Subscription)}]");
var cancelled = false;
Console.CancelKeyPress += (_, e) => {
e.Cancel = true; // prevent the process from terminating.
cancelled = true;
};
Console.WriteLine("Ctrl-C to exit.");
while (!cancelled)
{
consumer.Poll(TimeSpan.FromMilliseconds(100));
}
}
}
private static Dictionary<string, object> constructConfig(string brokerList, bool enableAutoCommit) =>
new Dictionary<string, object>
{
{ "group.id", "advanced-csharp-consumer" },
{ "enable.auto.commit", enableAutoCommit },
{ "auto.commit.interval.ms", 5000 },
{ "statistics.interval.ms", 60000 },
{ "bootstrap.servers", brokerList },
{ "default.topic.config", new Dictionary<string, object>()
{
{ "auto.offset.reset", "smallest" }
}
}
};
static void Main(string[] args)
{
Run_PollWithManualCommit("10.200.7.144:9092", new List<string> { "mytttt" });
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment