Skip to content

Instantly share code, notes, and snippets.

@ram-pi
Created January 12, 2024 18:46
Show Gist options
  • Save ram-pi/a6ee00f547c2cf8e04a9795ee530dfe8 to your computer and use it in GitHub Desktop.
Save ram-pi/a6ee00f547c2cf8e04a9795ee530dfe8 to your computer and use it in GitHub Desktop.
dotnet txn-producer
using System;
using Confluent.Kafka;
class Program
{
public static void Main(string[] args)
{
var conf = new ProducerConfig {
BootstrapServers = "localhost:9092",
SecurityProtocol = SecurityProtocol.SaslPlaintext,
SaslMechanism = SaslMechanism.ScramSha256,
SaslUsername = "admin",
SaslPassword = "admin-secret",
TransactionalId = "myproducer"
};
Action<DeliveryReport<Null, string>> handler = r =>
Console.WriteLine(!r.Error.IsError
? $"Delivered message to {r.TopicPartitionOffset}"
: $"Delivery Error: {r.Error.Reason}");
using (var p = new ProducerBuilder<Null, string>(conf).Build())
{
p.InitTransactions(TimeSpan.FromMinutes(10));
p.BeginTransaction();
for (int i=0; i<10; ++i)
{
p.Produce("test", new Message<Null, string> { Value = i.ToString() }, handler);
}
p.CommitTransaction(TimeSpan.FromSeconds(10));
// wait for up to 10 seconds for any inflight messages to be delivered.
p.Flush(TimeSpan.FromSeconds(10));
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment