Last active
August 16, 2021 19:21
-
-
Save feO2x/b2ae50cfdc6f96331d3d1dadd7271293 to your computer and use it in GitHub Desktop.
Composite Async Queue (David Fowler Coding Challenge)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System; | |
using System.Collections.Generic; | |
using System.Threading.Tasks; | |
namespace CompositeAsyncQueues | |
{ | |
public interface IAsyncQueue<T> | |
{ | |
Task<T> DequeueAsync(); | |
} | |
public sealed class CompositeQueue<T> : IAsyncQueue<T> | |
{ | |
public CompositeQueue(IAsyncQueue<T>[] queues) | |
{ | |
Queues = queues ?? throw new ArgumentNullException(nameof(queues)); | |
if (queues.Length == 0) | |
throw new ArgumentException("There must be at least one queue", nameof(queues)); | |
CurrentTasks = new(queues.Length); | |
} | |
private IAsyncQueue<T>[] Queues { get; } | |
private Dictionary<Task<T>, IAsyncQueue<T>> CurrentTasks { get; } | |
/// <summary> | |
/// Returns the next item from any of the internal queues. This operation is not written for concurrent access | |
/// so there should only be one consumer calling this method. | |
/// </summary> | |
public async Task<T> DequeueAsync() | |
{ | |
// This class has a dictionary of tasks (mapped to the corresponding queue they were taken from) | |
// that were already dequeued. | |
// If this dictionary is empty, we call DequeueAsync on every queue to fill the dictionary. | |
// This only happens on the first call to DequeueAsync. | |
if (CurrentTasks.Count == 0) | |
{ | |
foreach (var queue in Queues) | |
{ | |
CurrentTasks.Add(queue.DequeueAsync(), queue); | |
} | |
} | |
// This implementation loops endlessly until a queue produces a valid result | |
// A proper implementation would allow cancellation tokens or a timeout property | |
while (true) | |
{ | |
// Here is the important part: we await the composite task to get a completed task from | |
// any of the queues | |
var completedTask = await Task.WhenAny(CurrentTasks.Keys); | |
// The completed task will be removed and | |
// we immediately try to dequeue again from the same queue for subsequent | |
// calls to this method or further loop runs | |
var queueOfCompletedTask = CurrentTasks[completedTask]; | |
CurrentTasks.Remove(completedTask); | |
CurrentTasks.Add(queueOfCompletedTask.DequeueAsync(), queueOfCompletedTask); | |
// The queue task might be faulted or cancelled, so we check that here | |
if (completedTask.IsCompletedSuccessfully) | |
return completedTask.Result; | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment