Created
August 7, 2023 19:57
-
-
Save ddjerqq/7b2c095545234823bbcc9f67388903c4 to your computer and use it in GitHub Desktop.
A thread-safe asynchronous queue.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
using System.Threading.Tasks.Dataflow; | |
namespace AsyncQueue; | |
/// <summary> | |
/// A thread-safe asynchronous queue. | |
/// </summary> | |
/// <example> | |
/// <code> | |
/// var queue = new AsyncQueue`int(); | |
/// await foreach(int i in queue) { | |
/// // Writes a line as soon as some other Task calls queue.Enqueue(..) | |
/// Console.WriteLine(i); | |
/// } | |
/// </code> | |
/// </example> | |
/// <remarks> | |
/// Inspired by an amazing answer on StackOverflow [here](https://stackoverflow.com/a/55912725/14860947) | |
/// </remarks> | |
/// <typeparam name="T"> | |
/// The type of the elements in the queue. | |
/// </typeparam> | |
public sealed class AsyncQueue<T> : IAsyncEnumerable<T> | |
{ | |
private readonly SemaphoreSlim _enumerationSemaphore = new(1); | |
private readonly BufferBlock<T> _bufferBlock = new(); | |
/// <summary> | |
/// Add an item to the queue atomically. | |
/// </summary> | |
/// <param name="item">The item to add</param> | |
public void Enqueue(T item) => _bufferBlock.Post(item); | |
public async IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken ct = default) | |
{ | |
await _enumerationSemaphore.WaitAsync(ct); | |
while (!ct.IsCancellationRequested) | |
yield return await _bufferBlock.ReceiveAsync(ct); | |
_enumerationSemaphore.Release(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment