Skip to content

Instantly share code, notes, and snippets.

@brnls
Created September 22, 2023 03:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save brnls/0e06787bcc759e9529b338611e3b9ed7 to your computer and use it in GitHub Desktop.
Save brnls/0e06787bcc759e9529b338611e3b9ed7 to your computer and use it in GitHub Desktop.
Concurrent Consumer
using System.Threading.Channels;
using Confluent.Kafka;
using Microsoft.Extensions.Logging;
namespace Worker1;
public delegate Task MessageHandler(ConsumeResult<string, byte[]> result, Action acknowledge, CancellationToken cancellationToken);
public class ConcurrentKafkaReceiver : IAsyncDisposable
{
private readonly IConsumer<string, byte[]> _consumer;
private readonly IEnumerable<string> _topics;
private readonly ILogger<ConcurrentKafkaReceiver> _logger;
private readonly CancellationTokenSource _disposeCts = new CancellationTokenSource();
private readonly CancellationToken _disposeCt;
private readonly Dictionary<TopicPartition, TopicPartitionConsumer> _topicPartitionConsumers = new();
private MessageHandler? _messageHandler;
private Task? _consumeTask;
private readonly Channel<TopicPartition> _unpauseChannel = Channel.CreateUnbounded<TopicPartition>();
public ConcurrentKafkaReceiver(
ConsumerConfig config,
IEnumerable<string> topics,
ILoggerFactory loggerFactory)
{
_logger = loggerFactory.CreateLogger<ConcurrentKafkaReceiver>();
_disposeCt = _disposeCts.Token;
_consumer = new ConsumerBuilder<string, byte[]>(config)
.SetPartitionsAssignedHandler((c, topicPartitions) =>
{
foreach (var topicPartition in topicPartitions)
{
_logger.LogInformation($"Assigned {topicPartition}");
_topicPartitionConsumers[topicPartition] = new TopicPartitionConsumer(
topicPartition,
Channel.CreateBounded<ConsumeResult<string, byte[]>>(
new BoundedChannelOptions(100)
{
SingleReader = true,
SingleWriter = true,
AllowSynchronousContinuations = false,
}),
_disposeCt,
loggerFactory.CreateLogger<TopicPartitionConsumer>(),
_messageHandler!,
consumeResult =>
{
c.StoreOffset(consumeResult);
_logger.LogInformation($"stored {consumeResult.TopicPartitionOffset}");
});
}
})
.SetPartitionsRevokedHandler((c, topicPartitions) =>
{
foreach(var topicPartition in topicPartitions)
{
_logger.LogInformation($"Revoked {topicPartition}");
_logger.LogInformation($"Queued messages {_topicPartitionConsumers[topicPartition.TopicPartition].Messages}");
}
// Give currently in flight messages 10 seconds to stop processing before cancelling
var stopProcessingTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(10));
var stoppedProcessing = Task.WhenAll(topicPartitions.Select(
x => _topicPartitionConsumers[x.TopicPartition].WaitForStop(stopProcessingTokenSource.Token)))
.Wait(TimeSpan.FromSeconds(10));
if (!stoppedProcessing)
{
_logger.LogWarning("Timeout occurred while stopping one or more more topic partition consumers");
}
foreach(var topicPartition in topicPartitions)
{
_topicPartitionConsumers.Remove(topicPartition.TopicPartition);
}
_logger.LogInformation("Revoke partitions completed");
})
.SetOffsetsCommittedHandler((c, off) =>
{
foreach(var com in off.Offsets)
_logger.LogInformation($"Committing offset: {com}");
})
.Build();
_topics = topics;
}
public void Start(MessageHandler messageHandler)
{
_messageHandler = messageHandler;
_ = Task.Run(async () =>
{
using var timer = new PeriodicTimer(TimeSpan.FromSeconds(5));
while (await timer.WaitForNextTickAsync(_disposeCt))
{
foreach(var topicPartitionConsumer in _topicPartitionConsumers.Values)
{
if(topicPartitionConsumer.Messages == 0 && topicPartitionConsumer.Paused)
{
await _unpauseChannel.Writer.WriteAsync(topicPartitionConsumer.TopicPartition);
}
}
}
});
_consumeTask = Task.Run(async () =>
{
_logger.LogInformation($"Subscribing to {string.Join(", ", _topics)}");
_consumer.Subscribe(_topics);
while (!_disposeCt.IsCancellationRequested)
{
while(_unpauseChannel.Reader.TryRead(out var topicPartition))
{
_logger.LogInformation($"Resuming {topicPartition}");
_consumer.Resume(new[] { topicPartition });
}
try
{
var consumeResult = _consumer.Consume(100);
_disposeCt.ThrowIfCancellationRequested();
if (consumeResult is not null)
{
var topicPartitionConsumer = _topicPartitionConsumers[consumeResult.TopicPartition];
bool posted = topicPartitionConsumer.TryPostMessage(consumeResult);
if (!posted)
{
_logger.LogInformation($"Pausing {consumeResult.TopicPartition}");
_consumer.Pause(new[] { consumeResult.TopicPartition });
_consumer.Seek(consumeResult.TopicPartitionOffset);
topicPartitionConsumer.Paused = true;
}
}
}
catch(ConsumeException ex)
{
await Task.Delay(TimeSpan.FromSeconds(5));
_logger.LogError(ex, "Error while consuming");
}
}
});
}
public async ValueTask DisposeAsync()
{
if (_disposeCt.IsCancellationRequested) return;
_disposeCts.Cancel();
_disposeCts.Dispose();
try
{
await (_consumeTask ?? Task.CompletedTask);
}
catch (Exception) { }
_consumer.Close();
_consumer.Dispose();
_logger.LogInformation("consumer closed and disposed");
}
}
public class TopicPartitionConsumer
{
public TopicPartition TopicPartition { get; }
private readonly Channel<ConsumeResult<string, byte[]>> _channel;
private readonly ILogger<TopicPartitionConsumer> _logger;
private readonly MessageHandler _messageHandler;
private readonly Action<ConsumeResult<string, byte[]>> _storeOffset;
private readonly Task _processTask;
private readonly CancellationTokenSource _gracefulShutdownCts;
private readonly CancellationTokenSource _ungraceulShutdownCts;
public int Messages => _channel.Reader.Count;
public bool Paused { get; set; }
public TopicPartitionConsumer(
TopicPartition topicPartition,
Channel<ConsumeResult<string, byte[]>> channel,
CancellationToken stoppingToken,
ILogger<TopicPartitionConsumer> logger,
MessageHandler messageHandler,
Action<ConsumeResult<string, byte[]>> storeOffset)
{
TopicPartition = topicPartition;
_channel = channel;
_logger = logger;
_messageHandler = messageHandler;
_storeOffset = storeOffset;
_processTask = Task.Run(ProcessPartition);
_gracefulShutdownCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
_ungraceulShutdownCts = new CancellationTokenSource();
}
public TopicPartitionOffset? LatestOffset { get; private set; }
public bool TryPostMessage(ConsumeResult<string, byte[]> result)
{
return _channel.Writer.TryWrite(result);
}
/// <summary>
/// Stop processing the partition, waiting for the currently procesing message to complete.
/// </summary>
public async Task WaitForStop(CancellationToken token)
{
token.Register(() => _ungraceulShutdownCts.Cancel());
_gracefulShutdownCts.Cancel();
try
{
await _processTask;
}
catch (OperationCanceledException e) when (e.CancellationToken == _gracefulShutdownCts.Token) { }
}
private async Task ProcessPartition()
{
while (true)
{
if (await _channel.Reader.WaitToReadAsync(_gracefulShutdownCts.Token))
{
if (!_channel.Reader.TryPeek(out var consumeResult)) continue;
_logger.LogInformation($"{TopicPartition} - Handling message offset {consumeResult.Offset}");
try
{
await _messageHandler!(consumeResult, () =>
{
LatestOffset = consumeResult.TopicPartitionOffset;
_storeOffset(consumeResult);
_channel.Reader.TryRead(out _);
},
_ungraceulShutdownCts.Token);
}
catch (Exception ex)
{
_logger.LogError(ex, $"{TopicPartition} Uncaught exception while handling message.");
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment