Skip to content

Instantly share code, notes, and snippets.

@pgrm
Created December 17, 2019 21:22
Show Gist options
  • Save pgrm/2a078fee2522cd693aa3d844686ed379 to your computer and use it in GitHub Desktop.
Save pgrm/2a078fee2522cd693aa3d844686ed379 to your computer and use it in GitHub Desktop.
Trying to implement a thread safe wrapper on top of the dotnet kafka consumer to work around the issue describe in https://github.com/confluentinc/confluent-kafka-dotnet/wiki/Client-Creation
public class MaybeThreadSafeConsumer<TKey, TValue> : IMaybeThreadSafeConsumer<TKey, TValue>
{
private static readonly TimeSpan DefaultLockTimeout = TimeSpan.FromSeconds(5);
private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
private readonly ReaderWriterLock _lock = new ReaderWriterLock();
private readonly IConsumer<TKey, TValue> _internalConsumer;
private readonly ILogger<MaybeThreadSafeConsumer<TKey, TValue>> _logger;
private volatile bool _isActive = true;
private bool _isDisposed;
public MaybeThreadSafeConsumer(ConsumerConfig config, ILogger<MaybeThreadSafeConsumer<TKey, TValue>> logger)
{
_logger = logger;
Check.NotNull(config, nameof(config));
var consumerBuilder = new ConsumerBuilder<TKey, TValue>(config);
consumerBuilder.SetPartitionsRevokedHandler((x, y) =>
{
DeactivateConsumer();
Commit();
});
consumerBuilder.SetLogHandler((_, message) =>
logger.Log((LogLevel)message.LevelAs(LogLevelType.MicrosoftExtensionsLogging), $"{message.Name} [{message.Facility}]: {message.Message}"));
_internalConsumer = consumerBuilder.Build();
}
public bool IsActive => _isActive;
public void Subscribe(IEnumerable<string> topics)
{
AssertIsActive(nameof(Subscribe));
_lock.AcquireWriterLock(DefaultLockTimeout);
try
{
AssertIsActive(nameof(Subscribe));
_internalConsumer.Subscribe(topics);
}
finally
{
_lock.ReleaseWriterLock();
}
}
public ConsumeResult<TKey, TValue> Consume(CancellationToken cancellationToken)
{
AssertIsActive(nameof(Consume));
var linkedTokens = CancellationTokenSource.CreateLinkedTokenSource(_cancellationTokenSource.Token, cancellationToken);
_lock.AcquireReaderLock(DefaultLockTimeout);
try
{
AssertIsActive(nameof(Consume));
return _internalConsumer.Consume(linkedTokens.Token);
}
finally
{
_lock.ReleaseReaderLock();
}
}
public void StoreOffset(ConsumeResult<TKey, TValue> item)
{
if (!_isActive)
{
_logger.LogWarning(GetNotActiveWarnMessage(nameof(StoreOffset)));
return;
}
_lock.AcquireReaderLock(DefaultLockTimeout);
try
{
if (!_isActive)
{
_logger.LogWarning(GetNotActiveWarnMessage(nameof(StoreOffset)));
return;
}
_internalConsumer.StoreOffset(item);
}
finally
{
_lock.ReleaseReaderLock();
}
}
public List<TopicPartitionOffset> Commit()
{
if (_isDisposed)
{
throw new ObjectDisposedException(nameof(MaybeThreadSafeConsumer<TKey, TValue>));
}
_lock.AcquireWriterLock(DefaultLockTimeout);
try
{
if (_isDisposed)
{
throw new ObjectDisposedException(nameof(MaybeThreadSafeConsumer<TKey, TValue>));
}
return _internalConsumer.Commit();
}
finally
{
_lock.ReleaseWriterLock();
}
}
public void Close()
{
if (_isDisposed)
{
_logger.LogWarning(GetNotActiveWarnMessage(nameof(Close)));
return;
}
_lock.AcquireWriterLock(DefaultLockTimeout);
try
{
if (_isDisposed)
{
_logger.LogWarning(GetNotActiveWarnMessage(nameof(Close)));
return;
}
_internalConsumer.Close();
}
finally
{
_lock.ReleaseWriterLock();
}
}
public void Dispose()
{
_isDisposed = true;
DeactivateConsumer();
_lock.AcquireWriterLock(DefaultLockTimeout);
try
{
_internalConsumer.Dispose();
}
finally
{
_lock.ReleaseWriterLock();
_cancellationTokenSource.Dispose();
}
}
private void AssertIsActive(string methodName)
{
if (!_isActive)
{
_logger.LogError($"{GetNotActiveWarnMessage(methodName)} An {nameof(InvalidOperationException)} will be thrown instead.");
throw new InvalidOperationException(GetNotActiveWarnMessage(methodName));
}
}
private void DeactivateConsumer()
{
_isActive = false;
_cancellationTokenSource.Cancel();
}
private static string GetNotActiveWarnMessage(string methodName) =>
$"The current MaybeThreadSafeConsumer is NOT active anymore. The method {methodName} won't be executed.";
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment