Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
public class KafkaProducerHostedService : IHostedService
{
private readonly ILogger<KafkaProducerHostedService> _logger;
private readonly IProducer<Null, string> _producer;
public KafkaProducerHostedService(ILogger<KafkaProducerHostedService> logger)
{
_logger = logger;
var config = new ProducerConfig()
{
BootstrapServers = "localhost:9092"
};
_producer = new ProducerBuilder<Null, string>(config).Build();
}
public async Task StartAsync(CancellationToken cancellationToken)
{
for (var i = 0; i < 100; ++i)
{
var value = $"Hello World {i}";
_logger.LogInformation(value);
await _producer.ProduceAsync("demo", new Message<Null, string>()
{
Value = value
}, cancellationToken);
}
_producer.Flush(TimeSpan.FromSeconds(10));
}
public Task StopAsync(CancellationToken cancellationToken)
{
_producer?.Dispose();
return Task.CompletedTask;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.