Skip to content

Instantly share code, notes, and snippets.

@mhowlett
Last active December 7, 2016 21:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mhowlett/1a492354bb3d7338a4eb18d7f25d3677 to your computer and use it in GitHub Desktop.
Save mhowlett/1a492354bb3d7338a4eb18d7f25d3677 to your computer and use it in GitHub Desktop.
explicitly setting partition offset with rdkafka-dotnet
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using RdKafka;
namespace AdvancedConsumer
{
public class Program
{
public static void Run(string brokerList, List<string> topics)
{
bool enableAutoCommit = false;
var config = new Config()
{
GroupId = "advanced-csharp-consumer",
EnableAutoCommit = enableAutoCommit,
StatisticsInterval = TimeSpan.FromSeconds(60)
};
using (var consumer = new EventConsumer(config, brokerList))
{
consumer.OnMessage += (obj, msg) => {
string text = Encoding.UTF8.GetString(msg.Payload, 0, msg.Payload.Length);
Console.WriteLine($"Topic: {msg.Topic} Partition: {msg.Partition} Offset: {msg.Offset} {text}");
if (!enableAutoCommit && msg.Offset % 10 == 0)
{
Console.WriteLine($"Committing offset");
consumer.Commit(msg).Wait();
Console.WriteLine($"Committed offset");
}
};
consumer.OnConsumerError += (obj, errorCode) =>
{
Console.WriteLine($"Consumer Error: {errorCode}");
};
consumer.OnEndReached += (obj, end) => {
Console.WriteLine($"Reached end of topic {end.Topic} partition {end.Partition}, next message will be at offset {end.Offset}");
};
consumer.OnError += (obj, error) => {
Console.WriteLine($"Error: {error.ErrorCode} {error.Reason}");
};
if (enableAutoCommit)
{
consumer.OnOffsetCommit += (obj, commit) => {
if (commit.Error != ErrorCode.NO_ERROR)
{
Console.WriteLine($"Failed to commit offsets: {commit.Error}");
}
Console.WriteLine($"Successfully committed offsets: [{string.Join(", ", commit.Offsets)}]");
};
}
consumer.OnPartitionsAssigned += (obj, partitions) => {
Console.WriteLine($"Assigned partitions: [{string.Join(", ", partitions)}], member id: {consumer.MemberId}");
consumer.Assign(partitions.Select(p => new TopicPartitionOffset(p.Topic, P.Partition, 2)).ToList());
};
consumer.OnPartitionsRevoked += (obj, partitions) => {
Console.WriteLine($"Revoked partitions: [{string.Join(", ", partitions)}]");
consumer.Unassign();
};
consumer.OnStatistics += (obj, json) => {
Console.WriteLine($"Statistics: {json}");
};
consumer.Subscribe(topics);
consumer.Start();
Console.WriteLine($"Assigned to: [{string.Join(", ", consumer.Assignment)}]");
Console.WriteLine($"Subscribed to: [{string.Join(", ", consumer.Subscription)}]");
Console.WriteLine($"Started consumer, press enter to stop consuming");
Console.ReadLine();
}
}
public static void Main(string[] args)
{
Run(args[0], args.Skip(1).ToList());
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment