Last active
September 4, 2021 12:13
-
-
Save ledjon-behluli/223ef47f29a1f4c1b739083fab3637aa to your computer and use it in GitHub Desktop.
Asynchronous implementation of an OutboxProcessor with error recovery.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class OutboxProcessor : IOutboxProcessor, IHostedService, IDisposable | |
{ | |
private readonly OutboxProcessorSettings settings; | |
private readonly ILogger<OutboxProcessor> logger; | |
private readonly IBusPublisher busPublisher; | |
private AsyncTimer timer; | |
public OutboxProcessor( | |
IBusPublisher busPublisher, | |
ILogger<OutboxProcessor> logger, | |
IOptions<OutboxProcessorSettings> options) | |
{ | |
this.busPublisher = busPublisher ?? throw new ArgumentNullException(nameof(busPublisher)); | |
this.logger = logger ?? throw new ArgumentNullException(nameof(logger)); | |
this.settings = (options ?? throw new ArgumentNullException(nameof(options))).Value; | |
} | |
public Task StartAsync(CancellationToken cancellationToken) | |
{ | |
timer = new AsyncTimer( | |
handler: ProcessMessagesAsync, | |
dueTime: TimeSpan.Zero, | |
period: settings.ClockPeriod); | |
timer.OnException += OnExceptionCallback; | |
timer.Start(); | |
logger.LogInformation("Processor started."); | |
return Task.CompletedTask; | |
} | |
public async Task StopAsync(CancellationToken cancellationToken) | |
{ | |
timer.Update(Timeout.InfiniteTimeSpan, TimeSpan.Zero, settings.ExceptionRecoveryCount); | |
timer.OnException -= OnExceptionCallback; | |
logger.LogWarning("Processor stopped."); | |
await timer.StopAsync(cancellationToken); | |
} | |
public async Task ProcessMessagesAsync() | |
{ | |
await PublishInOrder(); // If order of messages is important, publish them one-by-one. | |
await PublishInParallel(); // If order of messages is not important, publish them in parallel. | |
} | |
private async Task PublishInParallel() | |
{ | |
object[] messages = new object[] { }; // Get from some store. | |
var tasks = new List<Task>(); | |
foreach (object message in messages) | |
{ | |
tasks.Add(busPublisher.PublishAsync(message)); | |
} | |
await Task.WhenAll(tasks); | |
logger.LogInformation("Published {count} messages.", messages.Length); | |
} | |
private async Task PublishInOrder() | |
{ | |
object[] messages = new object[] { }; // Get from some store. | |
foreach (object message in messages) | |
{ | |
await busPublisher.PublishAsync(message); | |
logger.LogInformation("Published message."); | |
} | |
} | |
private void OnExceptionCallback(object sender, AsyncTimer.ExceptionArgs e) | |
{ | |
logger.LogWarning("Processor has recovered from {exception}. {left}/{total} errors until processor shuts down!", | |
e.Exception, settings.ExceptionRecoveryCount - e.Count, settings.ExceptionRecoveryCount); | |
} | |
public void Dispose() | |
{ | |
timer?.Dispose(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class AsyncTimer : IDisposable | |
{ | |
private readonly Func<Task> handler; | |
private readonly SemaphoreSlim semaphore; | |
public event EventHandler<ExceptionArgs> OnException; | |
private TimeSpan dueTime; | |
private TimeSpan period; | |
private Task executing; | |
private bool isRunning; | |
private bool disposed; | |
private int exceptionCount; | |
private int exceptionLimit; | |
public AsyncTimer(Func<Task> handler, TimeSpan dueTime, TimeSpan period) | |
{ | |
this.handler = handler; | |
this.dueTime = dueTime; | |
this.period = period; | |
this.exceptionCount = 0; | |
this.semaphore = new SemaphoreSlim(1); | |
} | |
public void Update(TimeSpan dueTime, TimeSpan period, int exceptionLimit) | |
{ | |
this.dueTime = dueTime; | |
this.period = period; | |
this.exceptionLimit = exceptionLimit; | |
} | |
public void Start() | |
{ | |
if (disposed) | |
throw new ObjectDisposedException(GetType().FullName); | |
semaphore.Wait(); | |
try | |
{ | |
if (isRunning) | |
return; | |
executing = ExecuteHandler(); | |
isRunning = true; | |
} | |
finally | |
{ | |
semaphore.Release(); | |
} | |
} | |
public async Task StopAsync(CancellationToken cancellationToken) | |
{ | |
if (disposed) | |
throw new ObjectDisposedException(GetType().FullName); | |
await semaphore.WaitAsync(cancellationToken); | |
try | |
{ | |
if (!isRunning) | |
return; | |
await executing; | |
} | |
finally | |
{ | |
isRunning = false; | |
semaphore.Release(); | |
} | |
} | |
private async Task ExecuteHandler() | |
{ | |
await Task.Delay(dueTime); | |
while (true) | |
{ | |
try | |
{ | |
await handler(); | |
await Task.Delay(period); | |
} | |
catch (Exception e) | |
{ | |
exceptionCount++; | |
OnException?.Invoke(this, new ExceptionArgs(e, exceptionCount)); | |
} | |
if (exceptionCount > exceptionLimit) | |
{ | |
isRunning = false; | |
break; | |
} | |
} | |
} | |
private void Dispose(bool disposing) | |
{ | |
if (disposed) | |
return; | |
if (disposing) | |
semaphore?.Dispose(); | |
disposed = true; | |
} | |
public void Dispose() | |
{ | |
Dispose(true); | |
GC.SuppressFinalize(this); | |
} | |
public class ExceptionArgs : EventArgs | |
{ | |
public Exception Exception { get; } | |
public int Count { get; } | |
public ExceptionArgs(Exception exception, int count) | |
{ | |
Exception = exception; | |
Count = count; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public interface IBusPublisher | |
{ | |
Task PublishAsync(object message); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public interface IOutboxProcessor | |
{ | |
Task ProcessMessagesAsync(); | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
public class OutboxProcessorSettings | |
{ | |
public bool Enabled { get; set; } | |
public TimeSpan ClockPeriod { get; set; } | |
public int ExceptionRecoveryCount { get; set; } | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment