Skip to content

Instantly share code, notes, and snippets.

@ddjerqq
Created August 7, 2023 19:57
Show Gist options
  • Save ddjerqq/7b2c095545234823bbcc9f67388903c4 to your computer and use it in GitHub Desktop.
Save ddjerqq/7b2c095545234823bbcc9f67388903c4 to your computer and use it in GitHub Desktop.
A thread-safe asynchronous queue.
using System.Threading.Tasks.Dataflow;
namespace AsyncQueue;
/// <summary>
/// A thread-safe asynchronous queue.
/// </summary>
/// <example>
/// <code>
/// var queue = new AsyncQueue`int();
/// await foreach(int i in queue) {
/// // Writes a line as soon as some other Task calls queue.Enqueue(..)
/// Console.WriteLine(i);
/// }
/// </code>
/// </example>
/// <remarks>
/// Inspired by an amazing answer on StackOverflow [here](https://stackoverflow.com/a/55912725/14860947)
/// </remarks>
/// <typeparam name="T">
/// The type of the elements in the queue.
/// </typeparam>
public sealed class AsyncQueue<T> : IAsyncEnumerable<T>
{
private readonly SemaphoreSlim _enumerationSemaphore = new(1);
private readonly BufferBlock<T> _bufferBlock = new();
/// <summary>
/// Add an item to the queue atomically.
/// </summary>
/// <param name="item">The item to add</param>
public void Enqueue(T item) => _bufferBlock.Post(item);
public async IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken ct = default)
{
await _enumerationSemaphore.WaitAsync(ct);
while (!ct.IsCancellationRequested)
yield return await _bufferBlock.ReceiveAsync(ct);
_enumerationSemaphore.Release();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment