Last active
June 7, 2023 14:07
-
-
Save idg10/24259530956ec218101cc2bfa2f69225 to your computer and use it in GitHub Desktop.
A spike illustrating one possible implementation strategy for an `IAsyncEnumerable` `Switch` operator
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
namespace AsyncEnumerableSwitchSpike; | |
internal static class SwitchExtension | |
{ | |
public static async IAsyncEnumerable<T> Switch<T>(this IAsyncEnumerable<IAsyncEnumerable<T>> sequences) | |
{ | |
IAsyncEnumerator<IAsyncEnumerable<T>>? currentOuterEnumerator = sequences.GetAsyncEnumerator(); | |
IAsyncEnumerator<T>? currentInnerEnumerator = null; | |
try | |
{ | |
// Gets set to null once we've reached the end of the outer sequence | |
ValueTask<bool> currentOuterMoveNext = currentOuterEnumerator.MoveNextAsync(); | |
IAsyncEnumerable<T>? newInner = null; | |
ValueTask<bool>? currentInnerMoveNext = null; | |
while (true) | |
{ | |
// If the source has a bunch of nested sequences for us immediately, we should churn straight | |
// through them, because the Switch is supposed to produce items from the latest | |
// one, so we'd just be wasting our time if we started trying to enumerate through | |
// the items of a child that is not the latest available item. | |
// (Note: this presumes that the caller isn't expecting us to at least start | |
// enumeration of every single child it produces. I think the Rx Switch implementation | |
// will actually subscribe to every single child, so there is a case for not doing | |
// this optimization.) | |
if (currentOuterEnumerator is not null && | |
(currentOuterMoveNext.IsCompleted == true // If the outer sequence has a value(/end of sequence) ready for us we want to process it immediately | |
|| currentInnerEnumerator == null)) // If outer sequence isn't ready yet but there is no current inner sequence, then we need to block. | |
{ | |
if (await currentOuterMoveNext) | |
{ | |
// We got a new nested sequence. | |
newInner = currentOuterEnumerator.Current; | |
currentOuterMoveNext = currentOuterEnumerator.MoveNextAsync(); | |
} | |
else | |
{ | |
// Outer sequence has finished | |
await currentOuterEnumerator.DisposeAsync(); | |
currentOuterEnumerator = null; | |
} | |
continue; | |
} | |
// We will only get here once the outer sequence currently has nothing more for | |
// us (either because it reached its end, or because its MoveNextAsync did not | |
// complete immediately). But since we churn through everything the outer sequence | |
// produces until it waits or stops, it's possible that during that process, it | |
// did return one or more inner sequences. If it did, we now need to start trying | |
// to retrieve values from the most recent inner sequence that it produced. | |
if (newInner is not null) | |
{ | |
// If we were previously iterating through an earlier inner sequence, we need to | |
// stop that because it's no longer the latest inner sequence. | |
if (currentInnerEnumerator is not null) | |
{ | |
// TODO: Should we cancel the work kicked off with currentInnerMoveNext? | |
await currentInnerEnumerator.DisposeAsync(); | |
currentInnerEnumerator = null; | |
// The outer sequence might have produced another item during the await | |
// we just did, so instead of subscribing to the new inner item right | |
// now, we should go round the outer loop one more time to see if there's | |
// now a newer inner sequence we should be using instead. | |
continue; | |
} | |
currentInnerEnumerator = newInner.GetAsyncEnumerator(); | |
currentInnerMoveNext = currentInnerEnumerator.MoveNextAsync(); | |
newInner = null; | |
} | |
// If we're here, it means the outer source doesn't have anything new for us | |
// right now. Maybe there's a current inner enumerator that does. | |
if (currentInnerMoveNext?.IsCompleted == true) | |
{ | |
// The MoveNextAsync completed. Is that because it produced a value, or | |
// because it has now decided that it has finished? | |
bool currentInnerProducedValue = await currentInnerMoveNext.Value; // Will always complete immediately | |
if (currentInnerProducedValue) | |
{ | |
// Yay! It's the entire point of this operator! | |
yield return currentInnerEnumerator!.Current; | |
currentInnerMoveNext = currentInnerEnumerator.MoveNextAsync(); | |
} | |
else | |
{ | |
await currentInnerEnumerator!.DisposeAsync(); | |
currentInnerEnumerator = null; | |
currentInnerMoveNext = null; | |
} | |
continue; | |
} | |
// If we get to here, it means that neither the outer nor inner sequence | |
// has an item for us right now. That might be because we've finished. | |
if (currentOuterEnumerator is null) | |
{ | |
if (currentInnerMoveNext is null) | |
{ | |
yield break; | |
} | |
else | |
{ | |
// There's no outer, but there's still an inner, so we should await | |
// just the inner task. | |
// Note: this is identical to some code above, which is slightly | |
// unsatisfactory, but I couldn't see an easy way to avoid that. | |
// (I don't want a local method because it will need to be async, | |
// which will cause an unnecessary allocation in the earlier case | |
// where we don't block.) | |
bool currentInnerProducedValue = await currentInnerMoveNext.Value; // Will always block | |
if (currentInnerProducedValue) | |
{ | |
// Yay! It's the entire point of this operator! | |
yield return currentInnerEnumerator!.Current; | |
currentInnerMoveNext = currentInnerEnumerator.MoveNextAsync(); | |
} | |
else | |
{ | |
await currentInnerEnumerator!.DisposeAsync(); | |
currentInnerEnumerator = null; | |
currentInnerMoveNext = null; | |
} | |
continue; | |
} | |
} | |
else | |
{ | |
if (currentInnerMoveNext is null) | |
{ | |
// I don't think we expect ever to reach this point without a current inner enumeration in progress, | |
// because the very first if statement at the top of the loop detects that case and awaits the | |
// outer sequence | |
throw new Exception("Unexpected state"); // Sometimes I wish there was a .NET Exception type that meant what HTTP statuc 500 does | |
} | |
// At this point, we have both outer and inner enumerations in progress, and neither has anything | |
// for us at this instant. So this is the point where we have to perform an await that watches both | |
// concurrently. We've been carefully avoiding allocating a real Task from all the ValueTasks, but | |
// we can no longer avoid that. | |
Task<bool> currentOuterMoveNextFullTask = currentOuterMoveNext.AsTask(); | |
Task<bool> currentInnerMoveNextFullTask = currentInnerMoveNext.Value.AsTask(); | |
await Task.WhenAny(currentOuterMoveNextFullTask, currentInnerMoveNextFullTask); | |
// We just burned the two value tasks by wrapping them as tasks. But to avoid duplicating code, | |
// the easiest thing to do at this point is to create a couple of new ValueTasks from the Tasks | |
// we just create. While this may seem a bit daft, I have yet to see a simpler option. | |
currentOuterMoveNext = new ValueTask<bool>(currentOuterMoveNextFullTask); | |
currentInnerMoveNext = new ValueTask<bool>(currentInnerMoveNextFullTask); | |
} | |
} | |
} | |
finally | |
{ | |
if (currentInnerEnumerator != null) | |
{ | |
await currentInnerEnumerator.DisposeAsync(); | |
} | |
if (currentOuterEnumerator is not null) | |
{ | |
await currentOuterEnumerator.DisposeAsync(); | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment