Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
withLatestFrom for Apple's Combine
//
// Combine+WithLatestFrom.swift
//
// Created by Shai Mishali on 29/08/2019.
// Copyright © 2019 Shai Mishali. All rights reserved.
//
import Combine
// MARK: - Operator methods
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension Publisher {
/// Merges two publishers into a single publisher by combining each value
/// from self with the latest value from the second publisher, if any.
///
/// - parameter other: Second observable source.
/// - parameter resultSelector: Function to invoke for each value from the self combined
/// with the latest value from the second source, if any.
///
/// - returns: A publisher containing the result of combining each value of the self
/// with the latest value from the second publisher, if any, using the
/// specified result selector function.
func withLatestFrom<Other: Publisher, Result>(_ other: Other,
resultSelector: @escaping (Output, Other.Output) -> Result)
-> Publishers.WithLatestFrom<Self, Other, Result> {
return .init(upstream: self, second: other, resultSelector: resultSelector)
}
/// Upon an emission from self, emit the latest value from the
/// second publisher, if any exists.
///
/// - parameter other: Second observable source.
///
/// - returns: A publisher containing the latest value from the second publisher, if any.
func withLatestFrom<Other: Publisher>(_ other: Other)
-> Publishers.WithLatestFrom<Self, Other, Other.Output> {
return .init(upstream: self, second: other) { $1 }
}
}
// MARK: - Publisher
extension Publishers {
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public struct WithLatestFrom<Upstream: Publisher,
Other: Publisher,
Output>: Publisher where Upstream.Failure == Other.Failure {
public typealias Failure = Upstream.Failure
public typealias ResultSelector = (Upstream.Output, Other.Output) -> Output
private let upstream: Upstream
private let second: Other
private let resultSelector: ResultSelector
private var latestValue: Other.Output?
init(upstream: Upstream,
second: Other,
resultSelector: @escaping ResultSelector) {
self.upstream = upstream
self.second = second
self.resultSelector = resultSelector
}
public func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
let sub = Subscription(upstream: upstream,
second: second,
resultSelector: resultSelector,
subscriber: subscriber)
subscriber.receive(subscription: sub)
}
}
}
// MARK: - Subscription
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension Publishers.WithLatestFrom {
private class Subscription<S: Subscriber>: Combine.Subscription where S.Input == Output, S.Failure == Failure {
private let subscriber: S
private let resultSelector: ResultSelector
private var latestValue: Other.Output?
private let upstream: Upstream
private let second: Other
private var firstSubscription: Cancellable?
private var secondSubscription: Cancellable?
init(upstream: Upstream,
second: Other,
resultSelector: @escaping ResultSelector,
subscriber: S) {
self.upstream = upstream
self.second = second
self.subscriber = subscriber
self.resultSelector = resultSelector
latestFromSecond()
}
func request(_ demand: Subscribers.Demand) {
// withLatestFrom always takes one latest value from the second
// observable, so demand doesn't really have a meaning here.
firstSubscription = upstream.sink(
receiveCompletion: { [subscriber] in subscriber.receive(completion: $0) },
receiveValue: { [weak self] value in
guard let self = self else { return }
self.latestFromSecond()
guard let latest = self.latestValue else { return }
_ = self.subscriber.receive(self.resultSelector(value, latest))
})
}
private func latestFromSecond() {
let subscriber = AnySubscriber<Other.Output, Other.Failure>(
receiveSubscription: { [weak self] subscription in
self?.secondSubscription?.cancel()
self?.secondSubscription = subscription
subscription.request(.max(1))
},
receiveValue: { [weak self] value in
self?.latestValue = value
return .none
},
receiveCompletion: nil)
self.second.subscribe(subscriber)
}
func cancel() {
firstSubscription?.cancel()
secondSubscription?.cancel()
}
}
}
@freak4pc

This comment has been minimized.

Copy link
Owner Author

commented Aug 29, 2019

Example usage:

let subject1 = PassthroughSubject<String, Never>()
let subject2 = PassthroughSubject<String, Never>()

subject1
  .withLatestFrom(subject2) { ($0, $1) }
  .sink(receiveCompletion: { print($0) },
        receiveValue: { print($0) })
  .store(in: &subscriptions)

subject2.send("There")
subject1.send("Hey")
subject1.send("Is there anybody out")
subject2.send("using Combine?")
subject1.send("do you enjoy")
subject1.send(completion: .finished)

Prints out:

("Hey", "There")
("Is there anybody out", "There")
("do you enjoy", "using Combine?")
finished
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.