Skip to content

Instantly share code, notes, and snippets.

@theodorzoulias
Last active January 25, 2023 07:02
Show Gist options
  • Save theodorzoulias/a1442a01fe724cf5e2930ca799d4dd4a to your computer and use it in GitHub Desktop.
Save theodorzoulias/a1442a01fe724cf5e2930ca799d4dd4a to your computer and use it in GitHub Desktop.
DeferErrorUntilCompletion for asynchronous sequences
// https://stackoverflow.com/questions/73056639/how-to-chunkify-an-ienumerablet-without-losing-discarding-items-in-case-of-fa
private async static IAsyncEnumerable<TOutput> DeferErrorUntilCompletion<TInput, TOutput>(
IAsyncEnumerable<TInput> input,
Func<IAsyncEnumerable<TInput>, IAsyncEnumerable<TOutput>> conversion,
[EnumeratorCancellation] CancellationToken cancellationToken = default)
{
Task errorContainer = null;
async IAsyncEnumerable<TInput> InputIterator(
[EnumeratorCancellation] CancellationToken innerToken = default)
{
IAsyncEnumerator<TInput> enumerator = input.GetAsyncEnumerator(innerToken);
await using (enumerator.ConfigureAwait(false))
{
while (true)
{
TInput item;
try
{
if (!await enumerator.MoveNextAsync().ConfigureAwait(false)) break;
item = enumerator.Current;
}
catch (OperationCanceledException oce)
{
TaskCompletionSource<bool> tcs = new();
tcs.SetCanceled(oce.CancellationToken);
errorContainer = tcs.Task;
break;
}
catch (Exception ex)
{
errorContainer = Task.FromException(ex);
break;
}
yield return item;
}
}
}
IAsyncEnumerable<TOutput> output = conversion(InputIterator());
await foreach (TOutput item in output.WithCancellation(cancellationToken).ConfigureAwait(false))
yield return item;
errorContainer?.GetAwaiter().GetResult();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment