Skip to content

Instantly share code, notes, and snippets.

@awswithdotnet
Created March 8, 2022 20:27
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save awswithdotnet/44bb26a847f5b134fa138857c5a6b28d to your computer and use it in GitHub Desktop.
Save awswithdotnet/44bb26a847f5b134fa138857c5a6b28d to your computer and use it in GitHub Desktop.
kafka KafkaConsumer Complete
public void Consume(){
var config = new ConsumerConfig
{
BootstrapServers = _bootstrapServers,
GroupId = _groupId,
AutoOffsetReset = _autoOffsetReset
};
try{
_consumer = new ConsumerBuilder<String, String>(config).Build();
_consumer.Subscribe(_topics);
_isConsuming = true;
int i = 0;
while (_isConsuming){
i++;
Console.WriteLine(i + ": ");
ConsumeResult<String, String> consumeResult = _consumer.Consume(_cts.Token);
Console.WriteLine(consumeResult.Message.Value);
}
}
catch (OperationCanceledException ex){
Console.WriteLine("Application was ended: " + ex.Message.ToString());
}
catch(Exception ex){
Console.WriteLine("Application Crashed: " + ex.Message);
}
finally{
if (_consumer != null){
_consumer.Close();
((IDisposable)_consumer).Dispose();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment