Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save crozone/4eb5096fb2b67490e3481b4fcd796e6f to your computer and use it in GitHub Desktop.
Save crozone/4eb5096fb2b67490e3481b4fcd796e6f to your computer and use it in GitHub Desktop.
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Channels;
public static class ChannelReaderExtensions
{
/// <summary>
/// Creates an <see cref="IAsyncEnumerable{T}"/> that enables reading all of the data from the channel
/// by peeking each value. Each item is only read from the channel upon the next iteration, keeping the item
/// in the channel until it has been processed.
/// </summary>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to use to cancel the enumeration.</param>
/// <remarks>
/// This method should only be used in single reader scenarios.
/// Multiple concurrent readers can cause incorrect behavior as the peeked value may differ from the read value.
/// </remarks>
/// <returns>The created async enumerable.</returns>
public static async IAsyncEnumerable<T> ReadAllWithPeekAsync<T>(
this ChannelReader<T> reader,
[EnumeratorCancellation] CancellationToken cancellationToken = default
) where T : class
{
if (!reader.CanPeek) throw new InvalidOperationException($"This {nameof(ChannelReader<T>)} does not support peeking");
while (await reader.WaitToReadAsync(cancellationToken).ConfigureAwait(false))
{
while (reader.TryPeek(out T? peekedItem))
{
bool error;
try
{
yield return peekedItem;
}
finally
{
if (reader.TryRead(out T? readItem) && readItem == peekedItem)
{
error = false;
}
else
{
// We're not supposed to throw from within a finally clause, so instead just set an error flag.
// If we are able to execute past the end of the finally clause, throw the exception then.
error = true;
}
}
if (error)
{
throw new InvalidOperationException("Unexpected value was read from the channel reader");
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment