Skip to content

Instantly share code, notes, and snippets.

@MoaidHathot
Last active March 7, 2022 09:42
Show Gist options
  • Save MoaidHathot/45900e9265018ac5a99daec32aa57c09 to your computer and use it in GitHub Desktop.
Save MoaidHathot/45900e9265018ac5a99daec32aa57c09 to your computer and use it in GitHub Desktop.
David Fowler's challange to implement 'UberQueue<T>' //https://twitter.com/davidfowl/status/1426453915357175819
async Task Main()
{
var queues = new AsyncQueue<int>[]
{
new (100, TimeSpan.FromSeconds(1), Increment),
new (200, TimeSpan.FromSeconds(2), Increment),
new (300, TimeSpan.FromSeconds(3), Increment),
new (400, TimeSpan.FromSeconds(4), Increment),
};
var uber = new UberQueue<int>(queues);
int index = 0;
await Dequeu(index++, uber);
await Wait(2);
await Dequeu(index++, uber);
await Wait(2);
await Dequeu(index++, uber);
await Wait(2);
await Dequeu(index++, uber);
await Wait(5);
await Dequeu(index++, uber);
await Wait(1);
await Dequeu(index++, uber);
await Dequeu(index++, uber);
await Dequeu(index++, uber);
await Dequeu(index++, uber);
}
private async Task Wait(int secodns)
{
$" Started delay '{secodns}'".Dump();
await Task.Delay(TimeSpan.FromSeconds(secodns));
$" Finishe delay '{secodns}'".Dump();
}
private async Task Dequeu<T>(int index, UberQueue<T> uber)
{
$"[{index}] getting...".Dump();
var item = await uber.DequeueAsync();
$"[{index++}] Got '{item}'".Dump();
}
class UberQueue<T> : IAsyncQueue<T>
{
private IAsyncQueue<T>[] _queues;
private IAsyncEnumerator<T> _enumerator;
public UberQueue(IAsyncQueue<T>[] queues)
{
_queues = queues;
_enumerator = CreateEnumerator().GetAsyncEnumerator();
}
private async IAsyncEnumerable<T> CreateEnumerator()
{
while(_queues.Any())
{
var tasks = _queues.Select(q => (q, task: q.DequeueAsync())).ToList();
while(tasks.Any())
{
var done = await Task.WhenAny(tasks.Select(tuple => tuple.task));
$" Yielding '{done.Result}'".Dump();
yield return done.Result;
$" Fixing after yielding '{done.Result}'".Dump();
var pair = tasks.First(pair => pair.task == done);
tasks.Remove(pair);
tasks.Add((pair.q, pair.q.DequeueAsync()));
}
}
}
public async Task<T> DequeueAsync()
{
await _enumerator.MoveNextAsync();
return _enumerator.Current;
}
}
interface IAsyncQueue<T>
{
Task<T> DequeueAsync();
}
public class AsyncQueue<T> : IAsyncQueue<T>
{
public TimeSpan Delay { get; }
public T _item;
private Func<T, T> _updater;
public AsyncQueue(T item, TimeSpan delay, Func<T, T> updater)
{
_item = item;
Delay = delay;
_updater = updater;
}
public async Task<T> DequeueAsync()
{
_item = _updater(_item);
$" Started '{_item}'".Dump();
await Task.Delay(Delay).ContinueWith(_ => $" Finished '{_item}'".Dump());
return _item;
}
}
private int Increment(int item) => item + 1;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment