Skip to content

Instantly share code, notes, and snippets.

@flensrocker
Last active August 14, 2021 15:27
Show Gist options
  • Save flensrocker/0e04b96ecf1af13a3475b55dc2a395a1 to your computer and use it in GitHub Desktop.
Save flensrocker/0e04b96ecf1af13a3475b55dc2a395a1 to your computer and use it in GitHub Desktop.
using System.Threading.Channels;
var uberQueue = new UberQueue<string>(Enumerable
.Range(0, 10)
.Select(queueIndex => new RandomDelayQueue(queueIndex))
.ToArray());
var uberQueueConsumers = Enumerable
.Range(0, 3)
.Select(consumerIndex => Task.Run(async () =>
{
while (true)
{
try
{
var result = await uberQueue.DequeueAsync();
Console.WriteLine($"consumer {consumerIndex}: {result}");
}
catch (Exception ex)
{
Console.WriteLine($"consumer {consumerIndex}: Catched: {ex.Message}");
}
}
}))
.ToArray();
await Task.WhenAll(uberQueueConsumers);
public class UberQueue<T>
{
private readonly SemaphoreSlim _lock;
private readonly Task _deferFirstDequeueTask;
private readonly Channel<(IAsyncQueue<T> queue, Task<T> dequeueTask)> _channel;
public UberQueue(IAsyncQueue<T>[] queues)
{
_channel = Channel.CreateBounded<(IAsyncQueue<T> queue, Task<T> dequeueTask)>(queues.Length);
_lock = new SemaphoreSlim(1, 1);
_deferFirstDequeueTask = new Task(() =>
{
foreach (var queue in queues)
Requeue(queue);
});
}
public async Task<T> DequeueAsync()
{
await EnsureFirstDequeueAsync().ConfigureAwait(false);
var (queue, dequeueTask) = await _channel.Reader.ReadAsync().ConfigureAwait(false);
Requeue(queue);
return await dequeueTask.ConfigureAwait(false);
}
private async Task EnsureFirstDequeueAsync()
{
if (_deferFirstDequeueTask.Status == TaskStatus.Created)
{
await _lock.WaitAsync().ConfigureAwait(false);
try
{
if (_deferFirstDequeueTask.Status == TaskStatus.Created)
_deferFirstDequeueTask.Start();
}
finally
{
_lock.Release();
}
}
await _deferFirstDequeueTask.ConfigureAwait(false);
}
private void Requeue(IAsyncQueue<T> queue)
{
_ = queue
.DequeueAsync()
.ContinueWith(dequeueTask => _channel.Writer.WriteAsync((queue, dequeueTask)).ConfigureAwait(false), TaskContinuationOptions.ExecuteSynchronously)
.ConfigureAwait(false);
}
}
public interface IAsyncQueue<T>
{
Task<T> DequeueAsync();
}
public class RandomDelayQueue : IAsyncQueue<string>
{
private readonly int _index;
private int _counter = 0;
public RandomDelayQueue(int index)
{
_index = index;
}
public async Task<string> DequeueAsync()
{
var count = _counter++;
var timeout = Random.Shared.Next(10, 100);
Console.WriteLine($"queue {_index}/{count} will finish in {timeout}ms");
await Task.Delay(timeout).ConfigureAwait(false);
var result = $"queue {_index}/{count} finished after {timeout}ms";
if (30 < timeout && timeout < 40)
throw new Exception(result);
return result;
}
}
@flensrocker
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment