Skip to content

Instantly share code, notes, and snippets.

@idg10
Last active June 7, 2023 14:07
Show Gist options
  • Save idg10/24259530956ec218101cc2bfa2f69225 to your computer and use it in GitHub Desktop.
Save idg10/24259530956ec218101cc2bfa2f69225 to your computer and use it in GitHub Desktop.
A spike illustrating one possible implementation strategy for an `IAsyncEnumerable` `Switch` operator
namespace AsyncEnumerableSwitchSpike;
internal static class SwitchExtension
{
public static async IAsyncEnumerable<T> Switch<T>(this IAsyncEnumerable<IAsyncEnumerable<T>> sequences)
{
IAsyncEnumerator<IAsyncEnumerable<T>>? currentOuterEnumerator = sequences.GetAsyncEnumerator();
IAsyncEnumerator<T>? currentInnerEnumerator = null;
try
{
// Gets set to null once we've reached the end of the outer sequence
ValueTask<bool> currentOuterMoveNext = currentOuterEnumerator.MoveNextAsync();
IAsyncEnumerable<T>? newInner = null;
ValueTask<bool>? currentInnerMoveNext = null;
while (true)
{
// If the source has a bunch of nested sequences for us immediately, we should churn straight
// through them, because the Switch is supposed to produce items from the latest
// one, so we'd just be wasting our time if we started trying to enumerate through
// the items of a child that is not the latest available item.
// (Note: this presumes that the caller isn't expecting us to at least start
// enumeration of every single child it produces. I think the Rx Switch implementation
// will actually subscribe to every single child, so there is a case for not doing
// this optimization.)
if (currentOuterEnumerator is not null &&
(currentOuterMoveNext.IsCompleted == true // If the outer sequence has a value(/end of sequence) ready for us we want to process it immediately
|| currentInnerEnumerator == null)) // If outer sequence isn't ready yet but there is no current inner sequence, then we need to block.
{
if (await currentOuterMoveNext)
{
// We got a new nested sequence.
newInner = currentOuterEnumerator.Current;
currentOuterMoveNext = currentOuterEnumerator.MoveNextAsync();
}
else
{
// Outer sequence has finished
await currentOuterEnumerator.DisposeAsync();
currentOuterEnumerator = null;
}
continue;
}
// We will only get here once the outer sequence currently has nothing more for
// us (either because it reached its end, or because its MoveNextAsync did not
// complete immediately). But since we churn through everything the outer sequence
// produces until it waits or stops, it's possible that during that process, it
// did return one or more inner sequences. If it did, we now need to start trying
// to retrieve values from the most recent inner sequence that it produced.
if (newInner is not null)
{
// If we were previously iterating through an earlier inner sequence, we need to
// stop that because it's no longer the latest inner sequence.
if (currentInnerEnumerator is not null)
{
// TODO: Should we cancel the work kicked off with currentInnerMoveNext?
await currentInnerEnumerator.DisposeAsync();
currentInnerEnumerator = null;
// The outer sequence might have produced another item during the await
// we just did, so instead of subscribing to the new inner item right
// now, we should go round the outer loop one more time to see if there's
// now a newer inner sequence we should be using instead.
continue;
}
currentInnerEnumerator = newInner.GetAsyncEnumerator();
currentInnerMoveNext = currentInnerEnumerator.MoveNextAsync();
newInner = null;
}
// If we're here, it means the outer source doesn't have anything new for us
// right now. Maybe there's a current inner enumerator that does.
if (currentInnerMoveNext?.IsCompleted == true)
{
// The MoveNextAsync completed. Is that because it produced a value, or
// because it has now decided that it has finished?
bool currentInnerProducedValue = await currentInnerMoveNext.Value; // Will always complete immediately
if (currentInnerProducedValue)
{
// Yay! It's the entire point of this operator!
yield return currentInnerEnumerator!.Current;
currentInnerMoveNext = currentInnerEnumerator.MoveNextAsync();
}
else
{
await currentInnerEnumerator!.DisposeAsync();
currentInnerEnumerator = null;
currentInnerMoveNext = null;
}
continue;
}
// If we get to here, it means that neither the outer nor inner sequence
// has an item for us right now. That might be because we've finished.
if (currentOuterEnumerator is null)
{
if (currentInnerMoveNext is null)
{
yield break;
}
else
{
// There's no outer, but there's still an inner, so we should await
// just the inner task.
// Note: this is identical to some code above, which is slightly
// unsatisfactory, but I couldn't see an easy way to avoid that.
// (I don't want a local method because it will need to be async,
// which will cause an unnecessary allocation in the earlier case
// where we don't block.)
bool currentInnerProducedValue = await currentInnerMoveNext.Value; // Will always block
if (currentInnerProducedValue)
{
// Yay! It's the entire point of this operator!
yield return currentInnerEnumerator!.Current;
currentInnerMoveNext = currentInnerEnumerator.MoveNextAsync();
}
else
{
await currentInnerEnumerator!.DisposeAsync();
currentInnerEnumerator = null;
currentInnerMoveNext = null;
}
continue;
}
}
else
{
if (currentInnerMoveNext is null)
{
// I don't think we expect ever to reach this point without a current inner enumeration in progress,
// because the very first if statement at the top of the loop detects that case and awaits the
// outer sequence
throw new Exception("Unexpected state"); // Sometimes I wish there was a .NET Exception type that meant what HTTP statuc 500 does
}
// At this point, we have both outer and inner enumerations in progress, and neither has anything
// for us at this instant. So this is the point where we have to perform an await that watches both
// concurrently. We've been carefully avoiding allocating a real Task from all the ValueTasks, but
// we can no longer avoid that.
Task<bool> currentOuterMoveNextFullTask = currentOuterMoveNext.AsTask();
Task<bool> currentInnerMoveNextFullTask = currentInnerMoveNext.Value.AsTask();
await Task.WhenAny(currentOuterMoveNextFullTask, currentInnerMoveNextFullTask);
// We just burned the two value tasks by wrapping them as tasks. But to avoid duplicating code,
// the easiest thing to do at this point is to create a couple of new ValueTasks from the Tasks
// we just create. While this may seem a bit daft, I have yet to see a simpler option.
currentOuterMoveNext = new ValueTask<bool>(currentOuterMoveNextFullTask);
currentInnerMoveNext = new ValueTask<bool>(currentInnerMoveNextFullTask);
}
}
}
finally
{
if (currentInnerEnumerator != null)
{
await currentInnerEnumerator.DisposeAsync();
}
if (currentOuterEnumerator is not null)
{
await currentOuterEnumerator.DisposeAsync();
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment