Skip to content

Instantly share code, notes, and snippets.

@swhitty
Last active May 3, 2021 23:12
Show Gist options
  • Save swhitty/03a50ba8201a1b3d46400e5b6cb54516 to your computer and use it in GitHub Desktop.
Save swhitty/03a50ba8201a1b3d46400e5b6cb54516 to your computer and use it in GitHub Desktop.
import Combine
public extension Publisher {
/// Converts a publisher's output into the most recent output of another publisher.
///
/// - parameter other: A publisher that will provide the latest value
/// - returns: A publisher that emits the latest ouput of another publisher
func withLatestFrom<Other: Publisher>(_ other: Other) -> WithLatestFrom<Self, Other, Other.Output> {
WithLatestFrom(upstream: self, other: other, transform: { _, other in other })
}
/// Merges a publisher's output with the most recent output of another publisher using a transformer.
///
/// - parameter other: A publisher that will provide the latest value
/// - parameter transform: A closure that transforms the output of both publishers
/// - returns: A publisher that emits the transformed ouput
func withLatestFrom<Other: Publisher, T>(_ other: Other,
transform: @escaping (Output, Other.Output) -> T) -> WithLatestFrom<Self, Other, T> {
WithLatestFrom(upstream: self, other: other, transform: transform)
}
}
public struct WithLatestFrom<Upstream: Publisher, Other: Publisher, Output>: Publisher {
public typealias Failure = Upstream.Failure
public typealias Transformer = (Upstream.Output, Other.Output) -> Output
private let upstream: Upstream
private let other: Other
private let transform: Transformer
public init(upstream: Upstream, other: Other, transform: @escaping Transformer) {
self.upstream = upstream
self.other = other
self.transform = transform
}
public func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
let sub = Subscription(
subscriber: subscriber,
upstream: upstream,
other: other,
transform: transform
)
subscriber.receive(subscription: sub)
}
}
private extension WithLatestFrom {
final class Subscription<S: Subscriber>: Combine.Subscription where Upstream.Failure == S.Failure, Output == S.Input {
private let upstream: Upstream
private let subscriber: S
private let transform: Transformer
private var latest: Other.Output?
private var latestCancellable: AnyCancellable?
private var upstreamCancellable: AnyCancellable?
init(subscriber: S, upstream: Upstream, other: Other, transform: @escaping Transformer) {
self.upstream = upstream
self.subscriber = subscriber
self.transform = transform
self.latestCancellable = other.sink(receiveCompletion: { _ in }) { [weak self] value in
self?.latest = value
}
}
func request(_ demand: Subscribers.Demand) {
upstreamCancellable = upstream.sink(
receiveCompletion: { [subscriber] in subscriber.receive(completion: $0) },
receiveValue: { [weak self, subscriber, transform] value in
guard let latest = self?.latest else { return }
_ = subscriber.receive(transform(value, latest))
}
)
}
func cancel() {
latestCancellable?.cancel()
upstreamCancellable?.cancel()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment