Skip to content

Instantly share code, notes, and snippets.

@chriseidhof
Last active October 23, 2023 20:41
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chriseidhof/b254d6b8636ee4ec908cea2c34fbe03e to your computer and use it in GitHub Desktop.
Save chriseidhof/b254d6b8636ee4ec908cea2c34fbe03e to your computer and use it in GitHub Desktop.
Async Zipped
/*
Make sure to compile this with the following flags:
-Xfrontend -warn-concurrency -Xfrontend -enable-actor-data-race-checks
*/
extension AsyncIteratorProtocol {
func newAndNext() async throws -> (Self, Element)? {
var copy = self
if let n = try await copy.next() {
return (copy, n)
} else {
return nil
}
}
}
struct AsyncZipped<S1: AsyncSequence, S2: AsyncSequence>: AsyncSequence
where S1: Sendable, S2: Sendable, S1.AsyncIterator: Sendable, S2.AsyncIterator: Sendable
{
var left: S1
var right: S2
typealias Element = (S1.Element, S2.Element)
func makeAsyncIterator() -> AsyncIterator {
AsyncIterator(left: left.makeAsyncIterator(), right: right.makeAsyncIterator())
}
struct AsyncIterator: AsyncIteratorProtocol {
var left: S1.AsyncIterator
var right: S2.AsyncIterator
mutating func next() async throws -> Element? {
let l0 = left
let r0 = right
async let x = try await l0.newAndNext()
async let y = try await r0.newAndNext()
switch try await (x,y) {
case let ((newLeft, l)?, (newRight, r)?):
left = newLeft
right = newRight
return (l, r)
default:
return nil
}
}
}
}
extension AsyncSequence where Self: Sendable, AsyncIterator: Sendable {
func zip<Other: AsyncSequence>(_ other: Other) -> AsyncZipped<Self, Other>
where Other: Sendable, Other.AsyncIterator: Sendable
{
AsyncZipped(left: self, right: other)
}
}
@Peter-Schorn
Copy link

What is the purpose of making a copy of the iterators every time you access the next element (newAndNext)? It seems unnecessary.

@chriseidhof
Copy link
Author

You can't mutate the iterators inside a child task. This is my workaround, if you have a better idea please let me know!

@phausler
Copy link

phausler commented Dec 7, 2021

newAndNext btw needs to return the iterator even in the nil and thrown cases since the iterator may have state that indicates it is at the end of an iteration.

@chriseidhof
Copy link
Author

@phausler I guess the current approach works as long as consumers never call next again, but of course, that's not a valid assumption. Will change it. I guess an even better approach is to somehow set a done flag in the combining iterator, just so that we don't unnecessarily consume the other iterator.

@Peter-Schorn
Copy link

Peter-Schorn commented Dec 8, 2021

@phausler AsyncIterator does need to keep track of when one iterator reaches the end so that it doesn't needlessly retrieve the next elements from the other iterator, but this doesn't require changing newAndNext:

mutating func next() async throws -> Element? {
    
    if reachedEnd {
        return nil
    }

    let l0 = left
    let r0 = right
    async let x = try await l0.newAndNext()
    async let y = try await r0.newAndNext()
    switch try await (x,y) {
        case let ((newLeft, l)?, (newRight, r)?):
            left = newLeft
            right = newRight
            return (l, r)
        default:
            reachedEnd = true
            return nil
    }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment