Skip to content

Instantly share code, notes, and snippets.

@ollieatkinson
Created August 1, 2022 08:40
Show Gist options
  • Save ollieatkinson/cf7dfb8b0023a6a3baded2f56da674f1 to your computer and use it in GitHub Desktop.
Save ollieatkinson/cf7dfb8b0023a6a3baded2f56da674f1 to your computer and use it in GitHub Desktop.
CombineLatest for Swift Concurrency
public func combineLatest<C>(
_ collection: C,
bufferingPolicy limit: AsyncStream<[C.Element.Element]>.Continuation.BufferingPolicy = .unbounded
) -> AsyncStream<[C.Element.Element]> where C: Collection, C.Element: AsyncSequence {
AsyncStream(bufferingPolicy: limit) { continuation in
let stream = CombineLatestActor<C.Element.Element>(collection.count)
continuation.onTermination = { @Sendable termination in
switch termination {
case .cancelled: Task { await stream.cancel() }
case .finished: break
}
}
for (i, sequence) in collection.enumerated() {
Task {
for try await value in sequence {
if await stream.isCancelled {
throw CancellationError()
}
if let values = await stream.insert(value, at: i) {
continuation.yield(values)
}
}
if await stream.complete(i) {
continuation.finish()
}
}
}
}
}
private actor CombineLatestActor<Element> {
var values: Array<Element?>
var seen, completed: Set<Int>
var isCancelled: Bool = false
init(_ count: Int) {
values = [Element?](repeating: nil, count: count)
seen = .init(minimumCapacity: count)
completed = .init(minimumCapacity: count)
}
@discardableResult
func insert(_ value: Element, at index: Int) -> [Element]? {
seen.insert(index)
values[index] = value
return seen.count == values.count
? values.map { $0.unsafelyUnwrapped }
: nil
}
@discardableResult
func complete(_ index: Int) -> Bool {
completed.insert(index)
return completed.count == values.count
}
func cancel() {
isCancelled = true
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment