Last active
February 10, 2020 14:23
-
-
Save onevcat/138ca5a41ee1a7f2994a6c366936744e to your computer and use it in GitHub Desktop.
ZipAll
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Combine | |
import Foundation | |
extension Publishers { | |
struct ZipAll<Collection: Swift.Collection>: Publisher | |
where Collection.Element: Publisher | |
{ | |
typealias Output = [Collection.Element.Output] | |
typealias Failure = Collection.Element.Failure | |
private let publishers: Collection | |
init(_ publishers: Collection) { | |
self.publishers = publishers | |
} | |
func receive<S>(subscriber: S) | |
where S : Subscriber, Failure == S.Failure, Output == S.Input | |
{ | |
let subscription = ZipAppSubscription<Collection.Element.Output, Failure>( | |
subscriber: subscriber, publishers: publishers.map { $0.eraseToAnyPublisher() } | |
) | |
subscriber.receive(subscription: subscription) | |
subscription.startSubscribing() | |
} | |
} | |
private class ZipAppSubscription<Output, Failure: Error>: Subscription | |
{ | |
private var leftDemand: Subscribers.Demand = .none | |
private var subscriber: AnySubscriber<[Output], Failure>? = nil | |
private var buffer: [[Output]] | |
private let publishers: [AnyPublisher<Output, Failure>] | |
private var childSubscriptions: [AnyCancellable] = [] | |
private var finishedCount = 0 | |
private var lock = NSRecursiveLock() | |
init<S: Subscriber>( | |
subscriber: S, | |
publishers: [AnyPublisher<Output, Failure>] | |
) where Failure == S.Failure, [Output] == S.Input | |
{ | |
self.subscriber = AnySubscriber(subscriber) | |
self.buffer = Array(repeating: [], count: publishers.count) | |
self.publishers = publishers | |
} | |
func request(_ demand: Subscribers.Demand) { | |
lock.lock() | |
defer { lock.unlock() } | |
self.leftDemand += demand | |
send() | |
} | |
func cancel() { | |
lock.lock() | |
defer { lock.unlock() } | |
childSubscriptions = [] | |
subscriber = nil | |
} | |
func startSubscribing() { | |
for (i, publisher) in publishers.enumerated() { | |
publisher.sink( | |
receiveCompletion: { [weak self] completion in | |
self?.receiveCompletion(completion, at: i) | |
}, | |
receiveValue: { [weak self] value in | |
self?.receiveValue(value, at: i) | |
} | |
).store(in: &childSubscriptions) | |
} | |
} | |
private func receiveValue( | |
_ value: Output, at index: Int | |
) { | |
lock.lock() | |
defer { lock.unlock() } | |
buffer[index].append(value) | |
send() | |
} | |
private func send() { | |
guard let subscriber = subscriber else { return } | |
while leftDemand > .none, let outputs = firstRowOutputItems { | |
self.leftDemand -= .max(1) | |
let nextDemand = subscriber.receive(outputs) | |
self.leftDemand += nextDemand | |
} | |
} | |
private var firstRowOutputItems: [Output]? { | |
guard buffer.allSatisfy({ !$0.isEmpty }) else { return nil } | |
var outputs = [Output]() | |
for i in 0 ..< buffer.count { | |
var column = buffer[i] | |
outputs.append(column.remove(at: 0)) | |
buffer[i] = column | |
} | |
return outputs | |
} | |
private func receiveCompletion( | |
_ event: Subscribers.Completion<Failure>, at index: Int | |
) | |
{ | |
lock.lock() | |
defer { lock.unlock() } | |
guard let subscriber = subscriber else { return } | |
switch event { | |
case .finished: | |
finishedCount += 1 | |
if finishedCount == buffer.count { | |
subscriber.receive(completion: .finished) | |
self.subscriber = nil | |
} | |
case .failure: | |
subscriber.receive(completion: event) | |
self.subscriber = nil | |
} | |
} | |
} | |
} | |
extension Collection where Element: Publisher { | |
var zipAll: Publishers.ZipAll<Self> { | |
Publishers.ZipAll(self) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment