Skip to content

Instantly share code, notes, and snippets.

@feO2x
Last active August 16, 2021 19:21
Show Gist options
  • Save feO2x/b2ae50cfdc6f96331d3d1dadd7271293 to your computer and use it in GitHub Desktop.
Save feO2x/b2ae50cfdc6f96331d3d1dadd7271293 to your computer and use it in GitHub Desktop.
Composite Async Queue (David Fowler Coding Challenge)
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
namespace CompositeAsyncQueues
{
public interface IAsyncQueue<T>
{
Task<T> DequeueAsync();
}
public sealed class CompositeQueue<T> : IAsyncQueue<T>
{
public CompositeQueue(IAsyncQueue<T>[] queues)
{
Queues = queues ?? throw new ArgumentNullException(nameof(queues));
if (queues.Length == 0)
throw new ArgumentException("There must be at least one queue", nameof(queues));
CurrentTasks = new(queues.Length);
}
private IAsyncQueue<T>[] Queues { get; }
private Dictionary<Task<T>, IAsyncQueue<T>> CurrentTasks { get; }
/// <summary>
/// Returns the next item from any of the internal queues. This operation is not written for concurrent access
/// so there should only be one consumer calling this method.
/// </summary>
public async Task<T> DequeueAsync()
{
// This class has a dictionary of tasks (mapped to the corresponding queue they were taken from)
// that were already dequeued.
// If this dictionary is empty, we call DequeueAsync on every queue to fill the dictionary.
// This only happens on the first call to DequeueAsync.
if (CurrentTasks.Count == 0)
{
foreach (var queue in Queues)
{
CurrentTasks.Add(queue.DequeueAsync(), queue);
}
}
// This implementation loops endlessly until a queue produces a valid result
// A proper implementation would allow cancellation tokens or a timeout property
while (true)
{
// Here is the important part: we await the composite task to get a completed task from
// any of the queues
var completedTask = await Task.WhenAny(CurrentTasks.Keys);
// The completed task will be removed and
// we immediately try to dequeue again from the same queue for subsequent
// calls to this method or further loop runs
var queueOfCompletedTask = CurrentTasks[completedTask];
CurrentTasks.Remove(completedTask);
CurrentTasks.Add(queueOfCompletedTask.DequeueAsync(), queueOfCompletedTask);
// The queue task might be faulted or cancelled, so we check that here
if (completedTask.IsCompletedSuccessfully)
return completedTask.Result;
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment