Skip to content

Instantly share code, notes, and snippets.

@noseratio
Created May 16, 2022 21:58
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save noseratio/083be04528f14d4b7205e79b86e3ed1d to your computer and use it in GitHub Desktop.
Save noseratio/083be04528f14d4b7205e79b86e3ed1d to your computer and use it in GitHub Desktop.
An async queue which can be cleared from a producer end
// https://twitter.com/noseratio/status/1526314613364490241
public sealed class AsyncQueue<T>: IAsyncDisposable
{
private readonly Queue<T> _queue = new();
private readonly SemaphoreSlim _semaphore = new(initialCount: 1);
private readonly CancellationTokenSource _cts = new();
private TaskCompletionSource<DBNull> _itemTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
public async Task Clear(CancellationToken cancelToken)
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancelToken);
await _semaphore.WaitAsync(cts.Token);
try
{
_queue.Clear();
}
finally
{
_semaphore.Release();
}
}
public void Complete()
{
_cts.Cancel();
}
public async Task Enqueue(T item, CancellationToken cancelToken)
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancelToken);
await _semaphore.WaitAsync(cts.Token);
try
{
_queue.Enqueue(item);
_itemTcs.TrySetResult(DBNull.Value);
}
finally
{
_semaphore.Release();
}
}
public async Task<T> Dequeue(CancellationToken cancelToken)
{
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancelToken);
while (true)
{
TaskCompletionSource<DBNull>? tcs;
await _semaphore.WaitAsync(cts.Token);
try
{
cts.Token.ThrowIfCancellationRequested();
if (_queue.TryDequeue(out var item))
{
return item;
}
if (_itemTcs.Task.IsCompleted)
{
_itemTcs = new(TaskCreationOptions.RunContinuationsAsynchronously);
}
tcs = _itemTcs;
}
finally
{
_semaphore.Release();
}
using var rego = cts.Token.Register(() => tcs.TrySetCanceled(cts.Token));
await tcs.Task;
}
}
public async IAsyncEnumerable<T> DequeueAll(
[EnumeratorCancellation] CancellationToken cancelToken)
{
while (true)
{
yield return await this.Dequeue(cancelToken);
}
}
public async ValueTask DisposeAsync()
{
_cts.Cancel();
await _semaphore.WaitAsync();
try
{
_queue.Clear();
}
finally
{
_semaphore.Release();
_cts.Dispose();
_semaphore.Dispose();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment