Created
December 17, 2019 21:22
Trying to implement a thread safe wrapper on top of the dotnet kafka consumer to work around the issue describe in https://github.com/confluentinc/confluent-kafka-dotnet/wiki/Client-Creation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class MaybeThreadSafeConsumer<TKey, TValue> : IMaybeThreadSafeConsumer<TKey, TValue> | |
{ | |
private static readonly TimeSpan DefaultLockTimeout = TimeSpan.FromSeconds(5); | |
private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource(); | |
private readonly ReaderWriterLock _lock = new ReaderWriterLock(); | |
private readonly IConsumer<TKey, TValue> _internalConsumer; | |
private readonly ILogger<MaybeThreadSafeConsumer<TKey, TValue>> _logger; | |
private volatile bool _isActive = true; | |
private bool _isDisposed; | |
public MaybeThreadSafeConsumer(ConsumerConfig config, ILogger<MaybeThreadSafeConsumer<TKey, TValue>> logger) | |
{ | |
_logger = logger; | |
Check.NotNull(config, nameof(config)); | |
var consumerBuilder = new ConsumerBuilder<TKey, TValue>(config); | |
consumerBuilder.SetPartitionsRevokedHandler((x, y) => | |
{ | |
DeactivateConsumer(); | |
Commit(); | |
}); | |
consumerBuilder.SetLogHandler((_, message) => | |
logger.Log((LogLevel)message.LevelAs(LogLevelType.MicrosoftExtensionsLogging), $"{message.Name} [{message.Facility}]: {message.Message}")); | |
_internalConsumer = consumerBuilder.Build(); | |
} | |
public bool IsActive => _isActive; | |
public void Subscribe(IEnumerable<string> topics) | |
{ | |
AssertIsActive(nameof(Subscribe)); | |
_lock.AcquireWriterLock(DefaultLockTimeout); | |
try | |
{ | |
AssertIsActive(nameof(Subscribe)); | |
_internalConsumer.Subscribe(topics); | |
} | |
finally | |
{ | |
_lock.ReleaseWriterLock(); | |
} | |
} | |
public ConsumeResult<TKey, TValue> Consume(CancellationToken cancellationToken) | |
{ | |
AssertIsActive(nameof(Consume)); | |
var linkedTokens = CancellationTokenSource.CreateLinkedTokenSource(_cancellationTokenSource.Token, cancellationToken); | |
_lock.AcquireReaderLock(DefaultLockTimeout); | |
try | |
{ | |
AssertIsActive(nameof(Consume)); | |
return _internalConsumer.Consume(linkedTokens.Token); | |
} | |
finally | |
{ | |
_lock.ReleaseReaderLock(); | |
} | |
} | |
public void StoreOffset(ConsumeResult<TKey, TValue> item) | |
{ | |
if (!_isActive) | |
{ | |
_logger.LogWarning(GetNotActiveWarnMessage(nameof(StoreOffset))); | |
return; | |
} | |
_lock.AcquireReaderLock(DefaultLockTimeout); | |
try | |
{ | |
if (!_isActive) | |
{ | |
_logger.LogWarning(GetNotActiveWarnMessage(nameof(StoreOffset))); | |
return; | |
} | |
_internalConsumer.StoreOffset(item); | |
} | |
finally | |
{ | |
_lock.ReleaseReaderLock(); | |
} | |
} | |
public List<TopicPartitionOffset> Commit() | |
{ | |
if (_isDisposed) | |
{ | |
throw new ObjectDisposedException(nameof(MaybeThreadSafeConsumer<TKey, TValue>)); | |
} | |
_lock.AcquireWriterLock(DefaultLockTimeout); | |
try | |
{ | |
if (_isDisposed) | |
{ | |
throw new ObjectDisposedException(nameof(MaybeThreadSafeConsumer<TKey, TValue>)); | |
} | |
return _internalConsumer.Commit(); | |
} | |
finally | |
{ | |
_lock.ReleaseWriterLock(); | |
} | |
} | |
public void Close() | |
{ | |
if (_isDisposed) | |
{ | |
_logger.LogWarning(GetNotActiveWarnMessage(nameof(Close))); | |
return; | |
} | |
_lock.AcquireWriterLock(DefaultLockTimeout); | |
try | |
{ | |
if (_isDisposed) | |
{ | |
_logger.LogWarning(GetNotActiveWarnMessage(nameof(Close))); | |
return; | |
} | |
_internalConsumer.Close(); | |
} | |
finally | |
{ | |
_lock.ReleaseWriterLock(); | |
} | |
} | |
public void Dispose() | |
{ | |
_isDisposed = true; | |
DeactivateConsumer(); | |
_lock.AcquireWriterLock(DefaultLockTimeout); | |
try | |
{ | |
_internalConsumer.Dispose(); | |
} | |
finally | |
{ | |
_lock.ReleaseWriterLock(); | |
_cancellationTokenSource.Dispose(); | |
} | |
} | |
private void AssertIsActive(string methodName) | |
{ | |
if (!_isActive) | |
{ | |
_logger.LogError($"{GetNotActiveWarnMessage(methodName)} An {nameof(InvalidOperationException)} will be thrown instead."); | |
throw new InvalidOperationException(GetNotActiveWarnMessage(methodName)); | |
} | |
} | |
private void DeactivateConsumer() | |
{ | |
_isActive = false; | |
_cancellationTokenSource.Cancel(); | |
} | |
private static string GetNotActiveWarnMessage(string methodName) => | |
$"The current MaybeThreadSafeConsumer is NOT active anymore. The method {methodName} won't be executed."; | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment