Skip to content

Instantly share code, notes, and snippets.

@dcomartin
Last active August 6, 2020 21:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dcomartin/801244efecf324740109e0fe9c2b7ee2 to your computer and use it in GitHub Desktop.
Save dcomartin/801244efecf324740109e0fe9c2b7ee2 to your computer and use it in GitHub Desktop.
public class KafkaConsumerHostedService : IHostedService
{
private readonly ILogger<KafkaConsumerHostedService> _logger;
private readonly ClusterClient _cluster;
public KafkaConsumerHostedService(ILogger<KafkaConsumerHostedService> logger)
{
_logger = logger;
_cluster = new ClusterClient(new Configuration
{
Seeds = "localhost:9092"
}, new ConsoleLogger());
}
public Task StartAsync(CancellationToken cancellationToken)
{
_cluster.ConsumeFromLatest("demo");
_cluster.MessageReceived += record =>
{
_logger.LogInformation($"Received: {Encoding.UTF8.GetString(record.Value as byte[])}");
};
return Task.CompletedTask;
}
public Task StopAsync(CancellationToken cancellationToken)
{
_cluster?.Dispose();
return Task.CompletedTask;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment