Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save josephlord/ad53a7100db42e218d2049f0a4b1ec46 to your computer and use it in GitHub Desktop.
Save josephlord/ad53a7100db42e218d2049f0a4b1ec46 to your computer and use it in GitHub Desktop.
/// Kicks off the main loop over the async sequence. Does the main work within the for loop over the async seqence
/// - Parameters:
/// - seq: The AsyncSequence that is the source
/// - sub: The Subscriber to this Subscription
private func mainLoop(seq: AsyncSequenceType, sub: S) {
// taskHandle is kept for cancelation
taskHandle = Task {
do {
try await withTaskCancellationHandler {
Task.detached {
let cont = await self.innerActor.getContinuationToFireOnCancelation()
cont?.resume()
}
} operation: {
for try await element in seq {
// Check for demand before providing the first item
await self.innerActor.waitUntilReadyForMore()
guard !Task.isCancelled else { return } // Exit if cancelled
let newDemand = sub.receive(element) // Pass on the item
let cont = await self.innerActor.add(demand: newDemand)
assert(cont == nil,
"If we are't waiting on the demand the continuation will always be nil")
// cont should always be nil as it will only be set when this loop is
// waiting on demand
cont?.resume()
}
// Finished the AsyncSequence so finish the subcription
sub.receive(completion: .finished)
return
}
} catch {
// Cancel means the subscriber shouldn't get more, even errors so exit
if error is CancellationError { return }
sub.receive(completion: .failure(error))
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment