Skip to content

Instantly share code, notes, and snippets.

@dcomartin

dcomartin/consumer.cs

Last active Aug 6, 2020
Embed
What would you like to do?
public class KafkaConsumerHostedService : IHostedService
{
private readonly ILogger<KafkaConsumerHostedService> _logger;
private readonly ClusterClient _cluster;
public KafkaConsumerHostedService(ILogger<KafkaConsumerHostedService> logger)
{
_logger = logger;
_cluster = new ClusterClient(new Configuration
{
Seeds = "localhost:9092"
}, new ConsoleLogger());
}
public Task StartAsync(CancellationToken cancellationToken)
{
_cluster.ConsumeFromLatest("demo");
_cluster.MessageReceived += record =>
{
_logger.LogInformation($"Received: {Encoding.UTF8.GetString(record.Value as byte[])}");
};
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_cluster?.Dispose();
return Task.CompletedTask;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.