Skip to content

Instantly share code, notes, and snippets.

@mwahlig
Last active February 17, 2024 02:15
Show Gist options
  • Save mwahlig/725fe5e78e385093ba53e6f89028a41c to your computer and use it in GitHub Desktop.
Save mwahlig/725fe5e78e385093ba53e6f89028a41c to your computer and use it in GitHub Desktop.
a Combine Publisher that allows for zipping an array of publishers while maintaining their index
extension Publishers {
private struct IndexedResult<T> {
let index: Int
let result: T
}
private struct IndexedPublisher<Upstream>: Publisher where Upstream: Publisher {
typealias Output = IndexedResult<Upstream.Output>
typealias Failure = Upstream.Failure
let index: Int
let publisher: Upstream
func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
publisher.sink { (result) in
subscriber.receive(IndexedResult(index: self.index, result: result))
subscriber.receive(completion: .finished)
}
}
}
struct ZipMany<Upstream> : Publisher where Upstream : Publisher {
public typealias Output = [Upstream.Output]
public typealias Failure = Upstream.Failure
let publishers: [Upstream]
init(_ upstream: Upstream...) {
self.publishers = upstream
}
init(_ upstream: [Upstream]) {
self.publishers = upstream
}
public func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
Publishers.MergeMany(publishers.enumerated().map({ (offset: Int, element: Upstream) -> IndexedPublisher<Upstream> in
return IndexedPublisher(index: offset, publisher: element)
}))
.collect()
.sink(receiveValue: { (results) in
let sortedValues = results
.sorted { $0.index < $1.index }
.map { $0.result }
subscriber.receive(sortedValues)
subscriber.receive(completion: .finished)
})
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment