Skip to content

Instantly share code, notes, and snippets.

@janmazurczak
Created October 31, 2022 04:47
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save janmazurczak/ff97dc8770aaf3089f270a5294ccc26c to your computer and use it in GitHub Desktop.
Save janmazurczak/ff97dc8770aaf3089f270a5294ccc26c to your computer and use it in GitHub Desktop.
import Foundation
import Combine
public struct CombineManyLatest<Upstream: Publisher>: Publisher {
public init(_ upstream: Upstream...) {
self.upstream = upstream
}
public init(_ upstream: [Upstream]) {
self.upstream = upstream
}
let upstream: [Upstream]
public typealias Output = [Upstream.Output]
public typealias Failure = Upstream.Failure
public func receive<S>(subscriber: S) where S : Subscriber, Self.Output == S.Input, Self.Failure == S.Failure {
subscriber.receive(subscription: Subscription(subscriber: subscriber, upstream: upstream))
}
class Subscription<S: Subscriber>: Combine.Subscription where S.Input == Output, S.Failure == Failure {
init(subscriber: S, upstream: [Upstream]) {
self.subscriber = subscriber
self.upstream = upstream
}
var subscriber: S?
var limit: Int?
let upstream: [Upstream]
var latestUpstream: [LatestUpstream]?
var readyCount: Int = 0
var finishedCount: Int = 0
func request(_ demand: Subscribers.Demand) {
limit = demand.max
latestUpstream = upstream.map {
LatestUpstream(upstream: $0) { [weak self] in
self?.readyCount += 1
self?.sendIfReady()
} newValue: { [weak self] in
self?.sendIfReady()
} finished: { [weak self] in
self?.finishedCount += 1
if self?.finishedCount == self?.latestUpstream?.count {
self?.finish()
}
} failed: { [weak self] failure in
self?.subscriber?.receive(completion: .failure(failure))
self?.cancel()
}
}
sendIfReady()
if latestUpstream?.count == 0 {
finish()
}
}
func sendIfReady() {
guard
let latestUpstream = latestUpstream,
readyCount == latestUpstream.count
else { return }
performSendingLatest(from: latestUpstream)
}
private func performSendingLatest(from upstream: [LatestUpstream]) {
if let limit = limit {
if limit < 1 {
return
}
self.limit = limit - 1
}
let demand = subscriber?.receive(
upstream.compactMap {
switch $0.latest {
case .waiting: return nil
case .ready(let value): return value
}
}
)
if let demand = demand {
updateLimit(with: demand)
}
}
func updateLimit(with demand: Subscribers.Demand) {
guard
let limit = limit,
let limitAddition = demand.max
else {
self.limit = nil
return
}
self.limit = limit + limitAddition
}
func finish() {
subscriber?.receive(completion: .finished)
cancel()
}
func cancel() {
subscriber = nil
latestUpstream?.forEach { $0.cancel() }
latestUpstream = nil
readyCount = 0
finishedCount = 0
}
}
}
extension CombineManyLatest.Subscription {
class LatestUpstream {
init(
upstream: Upstream,
firstValue: @escaping () -> Void,
newValue: @escaping () -> Void,
finished: @escaping () -> Void,
failed: @escaping (Upstream.Failure) -> Void
) {
subscription = upstream.sink { completion in
switch completion {
case .finished: finished()
case .failure(let failure): failed(failure)
}
} receiveValue: { [weak self] value in
guard let previous = self?.latest else { return }
self?.latest = .ready(value)
switch previous {
case .waiting: firstValue()
case .ready: newValue()
}
}
}
private var subscription: AnyCancellable?
private(set) var latest: Latest = .waiting
enum Latest {
case waiting
case ready(Upstream.Output)
}
func cancel() {
subscription?.cancel()
subscription = nil
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment