Skip to content

Instantly share code, notes, and snippets.

Last active September 4, 2021 12:13
Show Gist options
  • Save ledjon-behluli/223ef47f29a1f4c1b739083fab3637aa to your computer and use it in GitHub Desktop.
Save ledjon-behluli/223ef47f29a1f4c1b739083fab3637aa to your computer and use it in GitHub Desktop.
Asynchronous implementation of an OutboxProcessor with error recovery.
public class OutboxProcessor : IOutboxProcessor, IHostedService, IDisposable
private readonly OutboxProcessorSettings settings;
private readonly ILogger<OutboxProcessor> logger;
private readonly IBusPublisher busPublisher;
private AsyncTimer timer;
public OutboxProcessor(
IBusPublisher busPublisher,
ILogger<OutboxProcessor> logger,
IOptions<OutboxProcessorSettings> options)
this.busPublisher = busPublisher ?? throw new ArgumentNullException(nameof(busPublisher));
this.logger = logger ?? throw new ArgumentNullException(nameof(logger));
this.settings = (options ?? throw new ArgumentNullException(nameof(options))).Value;
public Task StartAsync(CancellationToken cancellationToken)
timer = new AsyncTimer(
handler: ProcessMessagesAsync,
dueTime: TimeSpan.Zero,
period: settings.ClockPeriod);
timer.OnException += OnExceptionCallback;
logger.LogInformation("Processor started.");
return Task.CompletedTask;
public async Task StopAsync(CancellationToken cancellationToken)
timer.Update(Timeout.InfiniteTimeSpan, TimeSpan.Zero, settings.ExceptionRecoveryCount);
timer.OnException -= OnExceptionCallback;
logger.LogWarning("Processor stopped.");
await timer.StopAsync(cancellationToken);
public async Task ProcessMessagesAsync()
await PublishInOrder(); // If order of messages is important, publish them one-by-one.
await PublishInParallel(); // If order of messages is not important, publish them in parallel.
private async Task PublishInParallel()
object[] messages = new object[] { }; // Get from some store.
var tasks = new List<Task>();
foreach (object message in messages)
await Task.WhenAll(tasks);
logger.LogInformation("Published {count} messages.", messages.Length);
private async Task PublishInOrder()
object[] messages = new object[] { }; // Get from some store.
foreach (object message in messages)
await busPublisher.PublishAsync(message);
logger.LogInformation("Published message.");
private void OnExceptionCallback(object sender, AsyncTimer.ExceptionArgs e)
logger.LogWarning("Processor has recovered from {exception}. {left}/{total} errors until processor shuts down!",
e.Exception, settings.ExceptionRecoveryCount - e.Count, settings.ExceptionRecoveryCount);
public void Dispose()
public class AsyncTimer : IDisposable
private readonly Func<Task> handler;
private readonly SemaphoreSlim semaphore;
public event EventHandler<ExceptionArgs> OnException;
private TimeSpan dueTime;
private TimeSpan period;
private Task executing;
private bool isRunning;
private bool disposed;
private int exceptionCount;
private int exceptionLimit;
public AsyncTimer(Func<Task> handler, TimeSpan dueTime, TimeSpan period)
this.handler = handler;
this.dueTime = dueTime;
this.period = period;
this.exceptionCount = 0;
this.semaphore = new SemaphoreSlim(1);
public void Update(TimeSpan dueTime, TimeSpan period, int exceptionLimit)
this.dueTime = dueTime;
this.period = period;
this.exceptionLimit = exceptionLimit;
public void Start()
if (disposed)
throw new ObjectDisposedException(GetType().FullName);
if (isRunning)
executing = ExecuteHandler();
isRunning = true;
public async Task StopAsync(CancellationToken cancellationToken)
if (disposed)
throw new ObjectDisposedException(GetType().FullName);
await semaphore.WaitAsync(cancellationToken);
if (!isRunning)
await executing;
isRunning = false;
private async Task ExecuteHandler()
await Task.Delay(dueTime);
while (true)
await handler();
await Task.Delay(period);
catch (Exception e)
OnException?.Invoke(this, new ExceptionArgs(e, exceptionCount));
if (exceptionCount > exceptionLimit)
isRunning = false;
private void Dispose(bool disposing)
if (disposed)
if (disposing)
disposed = true;
public void Dispose()
public class ExceptionArgs : EventArgs
public Exception Exception { get; }
public int Count { get; }
public ExceptionArgs(Exception exception, int count)
Exception = exception;
Count = count;
public interface IBusPublisher
Task PublishAsync(object message);
public interface IOutboxProcessor
Task ProcessMessagesAsync();
public class OutboxProcessorSettings
public bool Enabled { get; set; }
public TimeSpan ClockPeriod { get; set; }
public int ExceptionRecoveryCount { get; set; }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment