Skip to content

Instantly share code, notes, and snippets.

@onevcat
Last active February 10, 2020 14:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save onevcat/138ca5a41ee1a7f2994a6c366936744e to your computer and use it in GitHub Desktop.
Save onevcat/138ca5a41ee1a7f2994a6c366936744e to your computer and use it in GitHub Desktop.
ZipAll
import Combine
import Foundation
extension Publishers {
struct ZipAll<Collection: Swift.Collection>: Publisher
where Collection.Element: Publisher
{
typealias Output = [Collection.Element.Output]
typealias Failure = Collection.Element.Failure
private let publishers: Collection
init(_ publishers: Collection) {
self.publishers = publishers
}
func receive<S>(subscriber: S)
where S : Subscriber, Failure == S.Failure, Output == S.Input
{
let subscription = ZipAppSubscription<Collection.Element.Output, Failure>(
subscriber: subscriber, publishers: publishers.map { $0.eraseToAnyPublisher() }
)
subscriber.receive(subscription: subscription)
subscription.startSubscribing()
}
}
private class ZipAppSubscription<Output, Failure: Error>: Subscription
{
private var leftDemand: Subscribers.Demand = .none
private var subscriber: AnySubscriber<[Output], Failure>? = nil
private var buffer: [[Output]]
private let publishers: [AnyPublisher<Output, Failure>]
private var childSubscriptions: [AnyCancellable] = []
private var finishedCount = 0
private var lock = NSRecursiveLock()
init<S: Subscriber>(
subscriber: S,
publishers: [AnyPublisher<Output, Failure>]
) where Failure == S.Failure, [Output] == S.Input
{
self.subscriber = AnySubscriber(subscriber)
self.buffer = Array(repeating: [], count: publishers.count)
self.publishers = publishers
}
func request(_ demand: Subscribers.Demand) {
lock.lock()
defer { lock.unlock() }
self.leftDemand += demand
send()
}
func cancel() {
lock.lock()
defer { lock.unlock() }
childSubscriptions = []
subscriber = nil
}
func startSubscribing() {
for (i, publisher) in publishers.enumerated() {
publisher.sink(
receiveCompletion: { [weak self] completion in
self?.receiveCompletion(completion, at: i)
},
receiveValue: { [weak self] value in
self?.receiveValue(value, at: i)
}
).store(in: &childSubscriptions)
}
}
private func receiveValue(
_ value: Output, at index: Int
) {
lock.lock()
defer { lock.unlock() }
buffer[index].append(value)
send()
}
private func send() {
guard let subscriber = subscriber else { return }
while leftDemand > .none, let outputs = firstRowOutputItems {
self.leftDemand -= .max(1)
let nextDemand = subscriber.receive(outputs)
self.leftDemand += nextDemand
}
}
private var firstRowOutputItems: [Output]? {
guard buffer.allSatisfy({ !$0.isEmpty }) else { return nil }
var outputs = [Output]()
for i in 0 ..< buffer.count {
var column = buffer[i]
outputs.append(column.remove(at: 0))
buffer[i] = column
}
return outputs
}
private func receiveCompletion(
_ event: Subscribers.Completion<Failure>, at index: Int
)
{
lock.lock()
defer { lock.unlock() }
guard let subscriber = subscriber else { return }
switch event {
case .finished:
finishedCount += 1
if finishedCount == buffer.count {
subscriber.receive(completion: .finished)
self.subscriber = nil
}
case .failure:
subscriber.receive(completion: event)
self.subscriber = nil
}
}
}
}
extension Collection where Element: Publisher {
var zipAll: Publishers.ZipAll<Self> {
Publishers.ZipAll(self)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment