Skip to content

Instantly share code, notes, and snippets.

@ledjon-behluli
Last active September 4, 2021 12:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ledjon-behluli/223ef47f29a1f4c1b739083fab3637aa to your computer and use it in GitHub Desktop.
Save ledjon-behluli/223ef47f29a1f4c1b739083fab3637aa to your computer and use it in GitHub Desktop.
Asynchronous implementation of an OutboxProcessor with error recovery.
public class OutboxProcessor : IOutboxProcessor, IHostedService, IDisposable
{
private readonly OutboxProcessorSettings settings;
private readonly ILogger<OutboxProcessor> logger;
private readonly IBusPublisher busPublisher;
private AsyncTimer timer;
public OutboxProcessor(
IBusPublisher busPublisher,
ILogger<OutboxProcessor> logger,
IOptions<OutboxProcessorSettings> options)
{
this.busPublisher = busPublisher ?? throw new ArgumentNullException(nameof(busPublisher));
this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
this.settings = (options ?? throw new ArgumentNullException(nameof(options))).Value;
}
public Task StartAsync(CancellationToken cancellationToken)
{
timer = new AsyncTimer(
handler: ProcessMessagesAsync,
dueTime: TimeSpan.Zero,
period: settings.ClockPeriod);
timer.OnException += OnExceptionCallback;
timer.Start();
logger.LogInformation("Processor started.");
return Task.CompletedTask;
}
public async Task StopAsync(CancellationToken cancellationToken)
{
timer.Update(Timeout.InfiniteTimeSpan, TimeSpan.Zero, settings.ExceptionRecoveryCount);
timer.OnException -= OnExceptionCallback;
logger.LogWarning("Processor stopped.");
await timer.StopAsync(cancellationToken);
}
public async Task ProcessMessagesAsync()
{
await PublishInOrder(); // If order of messages is important, publish them one-by-one.
await PublishInParallel(); // If order of messages is not important, publish them in parallel.
}
private async Task PublishInParallel()
{
object[] messages = new object[] { }; // Get from some store.
var tasks = new List<Task>();
foreach (object message in messages)
{
tasks.Add(busPublisher.PublishAsync(message));
}
await Task.WhenAll(tasks);
logger.LogInformation("Published {count} messages.", messages.Length);
}
private async Task PublishInOrder()
{
object[] messages = new object[] { }; // Get from some store.
foreach (object message in messages)
{
await busPublisher.PublishAsync(message);
logger.LogInformation("Published message.");
}
}
private void OnExceptionCallback(object sender, AsyncTimer.ExceptionArgs e)
{
logger.LogWarning("Processor has recovered from {exception}. {left}/{total} errors until processor shuts down!",
e.Exception, settings.ExceptionRecoveryCount - e.Count, settings.ExceptionRecoveryCount);
}
public void Dispose()
{
timer?.Dispose();
}
}
public class AsyncTimer : IDisposable
{
private readonly Func<Task> handler;
private readonly SemaphoreSlim semaphore;
public event EventHandler<ExceptionArgs> OnException;
private TimeSpan dueTime;
private TimeSpan period;
private Task executing;
private bool isRunning;
private bool disposed;
private int exceptionCount;
private int exceptionLimit;
public AsyncTimer(Func<Task> handler, TimeSpan dueTime, TimeSpan period)
{
this.handler = handler;
this.dueTime = dueTime;
this.period = period;
this.exceptionCount = 0;
this.semaphore = new SemaphoreSlim(1);
}
public void Update(TimeSpan dueTime, TimeSpan period, int exceptionLimit)
{
this.dueTime = dueTime;
this.period = period;
this.exceptionLimit = exceptionLimit;
}
public void Start()
{
if (disposed)
throw new ObjectDisposedException(GetType().FullName);
semaphore.Wait();
try
{
if (isRunning)
return;
executing = ExecuteHandler();
isRunning = true;
}
finally
{
semaphore.Release();
}
}
public async Task StopAsync(CancellationToken cancellationToken)
{
if (disposed)
throw new ObjectDisposedException(GetType().FullName);
await semaphore.WaitAsync(cancellationToken);
try
{
if (!isRunning)
return;
await executing;
}
finally
{
isRunning = false;
semaphore.Release();
}
}
private async Task ExecuteHandler()
{
await Task.Delay(dueTime);
while (true)
{
try
{
await handler();
await Task.Delay(period);
}
catch (Exception e)
{
exceptionCount++;
OnException?.Invoke(this, new ExceptionArgs(e, exceptionCount));
}
if (exceptionCount > exceptionLimit)
{
isRunning = false;
break;
}
}
}
private void Dispose(bool disposing)
{
if (disposed)
return;
if (disposing)
semaphore?.Dispose();
disposed = true;
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
public class ExceptionArgs : EventArgs
{
public Exception Exception { get; }
public int Count { get; }
public ExceptionArgs(Exception exception, int count)
{
Exception = exception;
Count = count;
}
}
}
public interface IBusPublisher
{
Task PublishAsync(object message);
}
public interface IOutboxProcessor
{
Task ProcessMessagesAsync();
}
public class OutboxProcessorSettings
{
public bool Enabled { get; set; }
public TimeSpan ClockPeriod { get; set; }
public int ExceptionRecoveryCount { get; set; }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment