Created
October 31, 2022 04:47
-
-
Save janmazurczak/ff97dc8770aaf3089f270a5294ccc26c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import Foundation | |
import Combine | |
public struct CombineManyLatest<Upstream: Publisher>: Publisher { | |
public init(_ upstream: Upstream...) { | |
self.upstream = upstream | |
} | |
public init(_ upstream: [Upstream]) { | |
self.upstream = upstream | |
} | |
let upstream: [Upstream] | |
public typealias Output = [Upstream.Output] | |
public typealias Failure = Upstream.Failure | |
public func receive<S>(subscriber: S) where S : Subscriber, Self.Output == S.Input, Self.Failure == S.Failure { | |
subscriber.receive(subscription: Subscription(subscriber: subscriber, upstream: upstream)) | |
} | |
class Subscription<S: Subscriber>: Combine.Subscription where S.Input == Output, S.Failure == Failure { | |
init(subscriber: S, upstream: [Upstream]) { | |
self.subscriber = subscriber | |
self.upstream = upstream | |
} | |
var subscriber: S? | |
var limit: Int? | |
let upstream: [Upstream] | |
var latestUpstream: [LatestUpstream]? | |
var readyCount: Int = 0 | |
var finishedCount: Int = 0 | |
func request(_ demand: Subscribers.Demand) { | |
limit = demand.max | |
latestUpstream = upstream.map { | |
LatestUpstream(upstream: $0) { [weak self] in | |
self?.readyCount += 1 | |
self?.sendIfReady() | |
} newValue: { [weak self] in | |
self?.sendIfReady() | |
} finished: { [weak self] in | |
self?.finishedCount += 1 | |
if self?.finishedCount == self?.latestUpstream?.count { | |
self?.finish() | |
} | |
} failed: { [weak self] failure in | |
self?.subscriber?.receive(completion: .failure(failure)) | |
self?.cancel() | |
} | |
} | |
sendIfReady() | |
if latestUpstream?.count == 0 { | |
finish() | |
} | |
} | |
func sendIfReady() { | |
guard | |
let latestUpstream = latestUpstream, | |
readyCount == latestUpstream.count | |
else { return } | |
performSendingLatest(from: latestUpstream) | |
} | |
private func performSendingLatest(from upstream: [LatestUpstream]) { | |
if let limit = limit { | |
if limit < 1 { | |
return | |
} | |
self.limit = limit - 1 | |
} | |
let demand = subscriber?.receive( | |
upstream.compactMap { | |
switch $0.latest { | |
case .waiting: return nil | |
case .ready(let value): return value | |
} | |
} | |
) | |
if let demand = demand { | |
updateLimit(with: demand) | |
} | |
} | |
func updateLimit(with demand: Subscribers.Demand) { | |
guard | |
let limit = limit, | |
let limitAddition = demand.max | |
else { | |
self.limit = nil | |
return | |
} | |
self.limit = limit + limitAddition | |
} | |
func finish() { | |
subscriber?.receive(completion: .finished) | |
cancel() | |
} | |
func cancel() { | |
subscriber = nil | |
latestUpstream?.forEach { $0.cancel() } | |
latestUpstream = nil | |
readyCount = 0 | |
finishedCount = 0 | |
} | |
} | |
} | |
extension CombineManyLatest.Subscription { | |
class LatestUpstream { | |
init( | |
upstream: Upstream, | |
firstValue: @escaping () -> Void, | |
newValue: @escaping () -> Void, | |
finished: @escaping () -> Void, | |
failed: @escaping (Upstream.Failure) -> Void | |
) { | |
subscription = upstream.sink { completion in | |
switch completion { | |
case .finished: finished() | |
case .failure(let failure): failed(failure) | |
} | |
} receiveValue: { [weak self] value in | |
guard let previous = self?.latest else { return } | |
self?.latest = .ready(value) | |
switch previous { | |
case .waiting: firstValue() | |
case .ready: newValue() | |
} | |
} | |
} | |
private var subscription: AnyCancellable? | |
private(set) var latest: Latest = .waiting | |
enum Latest { | |
case waiting | |
case ready(Upstream.Output) | |
} | |
func cancel() { | |
subscription?.cancel() | |
subscription = nil | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment