Skip to content

Instantly share code, notes, and snippets.

@noseratio
Last active November 9, 2023 19:18
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save noseratio/0989d5f986190b62299b8bf6a847a61e to your computer and use it in GitHub Desktop.
Save noseratio/0989d5f986190b62299b8bf6a847a61e to your computer and use it in GitHub Desktop.
Simple async wrapper around .NET Queue
public sealed class AsyncQueue<T>: IAsyncDisposable
{
private readonly Queue<T> _queue = new();
private readonly SemaphoreSlim _semaphore = new(initialCount: 1, maxCount: 1);
private readonly CancellationTokenSource _cts = new();
private TaskCompletionSource _itemTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
public async ValueTask<(bool, T)> TryPeekAsync(CancellationToken cancelToken)
{
await _semaphore.WaitAsync(cancelToken);
try
{
var hasValue = _queue.TryPeek(out var item);
return (hasValue, item!);
}
finally
{
_semaphore.Release();
}
}
public async ValueTask ClearAsync(CancellationToken cancelToken)
{
await _semaphore.WaitAsync(cancelToken);
try
{
_queue.Clear();
}
finally
{
_semaphore.Release();
}
}
public void Complete()
{
_cts.Cancel();
}
public async ValueTask EnqueueAsync(T item, CancellationToken cancelToken)
{
await _semaphore.WaitAsync(cancelToken);
try
{
_queue.Enqueue(item);
_itemTcs.TrySetResult();
}
finally
{
_semaphore.Release();
}
}
public async ValueTask<T> DequeueAsync(CancellationToken cancelToken)
{
while (true)
{
TaskCompletionSource tcs;
await _semaphore.WaitAsync(cancelToken);
try
{
if (_queue.TryDequeue(out var item))
{
return item;
}
if (_itemTcs.Task.IsCompleted)
{
_itemTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
}
tcs = _itemTcs;
}
finally
{
_semaphore.Release();
}
await tcs.Task.WaitAsync(cancelToken);
}
}
public async IAsyncEnumerable<T> DequeueAll(
[EnumeratorCancellation] CancellationToken cancelToken)
{
while (true)
{
yield return await this.DequeueAsync(cancelToken);
}
}
public async ValueTask DisposeAsync()
{
_cts.Cancel();
await _semaphore.WaitAsync();
try
{
_queue.Clear();
}
finally
{
_cts.Dispose();
_semaphore.Release();
_semaphore.Dispose();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment