Skip to content

Instantly share code, notes, and snippets.

@freak4pc
Last active February 19, 2024 15:35
Show Gist options
  • Save freak4pc/8d46ea6a6f5e5902c3fb5eba440a55c3 to your computer and use it in GitHub Desktop.
Save freak4pc/8d46ea6a6f5e5902c3fb5eba440a55c3 to your computer and use it in GitHub Desktop.
withLatestFrom for Apple's Combine
//
// Combine+WithLatestFrom.swift
//
// Created by Shai Mishali on 29/08/2019.
// Copyright © 2019 Shai Mishali. All rights reserved.
//
import Combine
// MARK: - Operator methods
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension Publisher {
/// Merges two publishers into a single publisher by combining each value
/// from self with the latest value from the second publisher, if any.
///
/// - parameter other: Second observable source.
/// - parameter resultSelector: Function to invoke for each value from the self combined
/// with the latest value from the second source, if any.
///
/// - returns: A publisher containing the result of combining each value of the self
/// with the latest value from the second publisher, if any, using the
/// specified result selector function.
func withLatestFrom<Other: Publisher, Result>(_ other: Other,
resultSelector: @escaping (Output, Other.Output) -> Result)
-> Publishers.WithLatestFrom<Self, Other, Result> {
return .init(upstream: self, second: other, resultSelector: resultSelector)
}
/// Upon an emission from self, emit the latest value from the
/// second publisher, if any exists.
///
/// - parameter other: Second observable source.
///
/// - returns: A publisher containing the latest value from the second publisher, if any.
func withLatestFrom<Other: Publisher>(_ other: Other)
-> Publishers.WithLatestFrom<Self, Other, Other.Output> {
return .init(upstream: self, second: other) { $1 }
}
}
// MARK: - Publisher
extension Publishers {
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public struct WithLatestFrom<Upstream: Publisher,
Other: Publisher,
Output>: Publisher where Upstream.Failure == Other.Failure {
public typealias Failure = Upstream.Failure
public typealias ResultSelector = (Upstream.Output, Other.Output) -> Output
private let upstream: Upstream
private let second: Other
private let resultSelector: ResultSelector
private var latestValue: Other.Output?
init(upstream: Upstream,
second: Other,
resultSelector: @escaping ResultSelector) {
self.upstream = upstream
self.second = second
self.resultSelector = resultSelector
}
public func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
let sub = Subscription(upstream: upstream,
second: second,
resultSelector: resultSelector,
subscriber: subscriber)
subscriber.receive(subscription: sub)
}
}
}
// MARK: - Subscription
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension Publishers.WithLatestFrom {
private class Subscription<S: Subscriber>: Combine.Subscription where S.Input == Output, S.Failure == Failure {
private let subscriber: S
private let resultSelector: ResultSelector
private var latestValue: Other.Output?
private let upstream: Upstream
private let second: Other
private var firstSubscription: Cancellable?
private var secondSubscription: Cancellable?
init(upstream: Upstream,
second: Other,
resultSelector: @escaping ResultSelector,
subscriber: S) {
self.upstream = upstream
self.second = second
self.subscriber = subscriber
self.resultSelector = resultSelector
trackLatestFromSecond()
}
func request(_ demand: Subscribers.Demand) {
// withLatestFrom always takes one latest value from the second
// observable, so demand doesn't really have a meaning here.
firstSubscription = upstream
.sink(
receiveCompletion: { [subscriber] in subscriber.receive(completion: $0) },
receiveValue: { [weak self] value in
guard let self = self else { return }
guard let latest = self.latestValue else { return }
_ = self.subscriber.receive(self.resultSelector(value, latest))
})
}
// Create an internal subscription to the `Other` publisher,
// constantly tracking its latest value
private func trackLatestFromSecond() {
let subscriber = AnySubscriber<Other.Output, Other.Failure>(
receiveSubscription: { [weak self] subscription in
self?.secondSubscription = subscription
subscription.request(.unlimited)
},
receiveValue: { [weak self] value in
self?.latestValue = value
return .unlimited
},
receiveCompletion: nil)
self.second.subscribe(subscriber)
}
func cancel() {
firstSubscription?.cancel()
secondSubscription?.cancel()
}
}
}
@freak4pc
Copy link
Author

As far as I'm concerned it's MIT. Feel free to use it.
It will be part of CombineCommunity/CombineExt, soon

@keylook
Copy link

keylook commented Feb 27, 2020

Got it, thanks.

@pawlowskialex
Copy link

Here's a version composed out of built-in operators instead of rebuilding the publisher:

import Combine
import Dispatch

extension Publishers {
    struct WithLatestFrom<Upstream: Publisher, Side: Publisher, Output>: Publisher where Upstream.Failure == Side.Failure {
        typealias Failure = Side.Failure
        typealias ResultSelector = (Upstream.Output, Side.Output) -> Output

        let upstream: Upstream
        let side: Side
        let resultSelector: ResultSelector
        
        func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
            let timestampedUpstream = upstream.map(TimestampedValue.init)
            let timestampedSide = side.map(TimestampedValue.init)
            let state = timestampedUpstream.combineLatest(timestampedSide, State.init)
            let mappedValues = state
                .filter { $0.upstream.time >= $0.side.time }
                .map { resultSelector($0.upstream.value, $0.side.value) }
            
            mappedValues.receive(subscriber: subscriber)
        }
        
        private struct TimestampedValue<T> {
            let value: T
            let time: DispatchTime
            
            init(value: T) {
                self.value = value
                self.time = DispatchTime.now()
            }
        }
        
        private struct State {
            let upstream: TimestampedValue<Upstream.Output>
            let side: TimestampedValue<Side.Output>
        }
    }
}

extension Publisher {
    func withLatestFrom<P: Publisher, R>(_ publisher: P,
                                         resultSelector: @escaping (Output, P.Output) -> R) -> Publishers.WithLatestFrom<Self, P, R> {
        Publishers.WithLatestFrom(upstream: self, side: publisher, resultSelector: resultSelector)
    }

    func withLatestFrom<P: Publisher>(_ publisher: P) -> Publishers.WithLatestFrom<Self, P, P.Output> {
        withLatestFrom(publisher, resultSelector: { $1 })
    }
}

@freak4pc
Copy link
Author

freak4pc commented Aug 6, 2021

Thanks @pawlowskialex.
If you're going down that path, here's a neatly composed variation @jasdev wrote:

extension Publisher {
  func withLatestFrom<Other: Publisher, Result>(_ other: Other,
                                                    resultSelector: @escaping (Output, Other.Output) -> Result)
      -> AnyPublisher<Result, Failure>
      where Other.Failure == Failure {
          let upstream = share()
  
          return other
              .map { second in upstream.map { resultSelector($0, second) } }
              .switchToLatest()
              .zip(upstream) // `zip`ping and discarding `\.1` allows for
                                        // upstream completions to be projected down immediately.
              .map(\.0)
              .eraseToAnyPublisher()
      }
}

And a blot post he wrote about this topic: https://jasdev.me/notes/with-latest-from

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment