Skip to content

Instantly share code, notes, and snippets.

@mhowlett
Created July 25, 2018 23:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mhowlett/ab04c78ad9bac3d31c9c9d02758df6fa to your computer and use it in GitHub Desktop.
Save mhowlett/ab04c78ad9bac3d31c9c9d02758df6fa to your computer and use it in GitHub Desktop.
high throughput produce
class Program
{
static void Main(string[] args)
{
var config = new Dictionary<string, object>
{
{ "bootstrap.servers", "10.200.7.144:9092" }
};
var bigNumber = 100;
var count = 1;
Action<DeliveryReport<Null, string>> handler = (DeliveryReport<Null, string> dr) => count += 1; // no interlock required.
using (var producer = new Producer<Null, string>(config, null, new StringSerializer(Encoding.UTF8)))
{
var stream = Enumerable.Range(1, bigNumber).Select(i => i.ToString());
foreach (var msg in stream)
{
while (true) // retry on queue full loop
{
try
{
producer.BeginProduce("hello", new Message<Null, string> { Value = msg }, handler);
break;
}
catch (KafkaException e)
{
if (e.Error.Code == ErrorCode.Local_QueueFull)
{
Thread.Sleep(100);
continue;
}
// non-recoverable error occured.
}
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment