Skip to content

Instantly share code, notes, and snippets.

@ollieatkinson
Last active June 29, 2021 13:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ollieatkinson/5c54766d9fad232ce2323f43706054e1 to your computer and use it in GitHub Desktop.
Save ollieatkinson/5c54766d9fad232ce2323f43706054e1 to your computer and use it in GitHub Desktop.
`combineLatest()` on `Collection` where elements are Publishers
import Combine
import Foundation
extension Collection where Element: Publisher {
public func combineLatest() -> CombineLatestCollection<Self> {
CombineLatestCollection(self)
}
}
public struct CombineLatestCollection<C: Collection> where C.Element: Publisher {
public typealias Output = [C.Element.Output]
public typealias Failure = C.Element.Failure
private let publishers: C
public init(_ publishers: C) { self.publishers = publishers }
public func receive<S>(subscriber: S) where S: Subscriber, S.Failure == Failure, S.Input == Output {
subscriber.receive(
subscription: Subscription(publishers: publishers, subscriber: subscriber)
)
}
}
extension CombineLatestCollection {
public final class Subscription<S: Subscriber>: Combine.Subscription where S.Failure == Failure, S.Input == Output {
private let subscribers: [AnyCancellable]
fileprivate init(publishers: C, subscriber: S) {
var result: [Int: C.Element.Output] = [:]
var completed = (count: 0, all: false)
let context = NSLock()
subscribers = publishers.enumerated().map { index, publisher in
publisher.sink(receiveCompletion: { completion in
context.lock()
defer { context.unlock() }
switch completion {
case .finished:
completed.count += 1
if completed.count == publishers.count {
subscriber.receive(completion: completion)
completed.all = true
}
case .failure:
subscriber.receive(completion: completion)
completed.all = true
}
}, receiveValue: { value in
context.lock()
defer { context.unlock() }
if completed.all { return }
result[index] = value
if result.count == publishers.count {
_ = subscriber.receive(Array(result.values))
}
})
}
}
public func request(_ demand: Subscribers.Demand) {}
public func cancel() {
for subscriber in subscribers {
subscriber.cancel()
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment