Skip to content

Instantly share code, notes, and snippets.

@freak4pc
Last active February 19, 2024 15:35
Show Gist options
  • Save freak4pc/8d46ea6a6f5e5902c3fb5eba440a55c3 to your computer and use it in GitHub Desktop.
Save freak4pc/8d46ea6a6f5e5902c3fb5eba440a55c3 to your computer and use it in GitHub Desktop.
withLatestFrom for Apple's Combine
//
// Combine+WithLatestFrom.swift
//
// Created by Shai Mishali on 29/08/2019.
// Copyright © 2019 Shai Mishali. All rights reserved.
//
import Combine
// MARK: - Operator methods
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension Publisher {
/// Merges two publishers into a single publisher by combining each value
/// from self with the latest value from the second publisher, if any.
///
/// - parameter other: Second observable source.
/// - parameter resultSelector: Function to invoke for each value from the self combined
/// with the latest value from the second source, if any.
///
/// - returns: A publisher containing the result of combining each value of the self
/// with the latest value from the second publisher, if any, using the
/// specified result selector function.
func withLatestFrom<Other: Publisher, Result>(_ other: Other,
resultSelector: @escaping (Output, Other.Output) -> Result)
-> Publishers.WithLatestFrom<Self, Other, Result> {
return .init(upstream: self, second: other, resultSelector: resultSelector)
}
/// Upon an emission from self, emit the latest value from the
/// second publisher, if any exists.
///
/// - parameter other: Second observable source.
///
/// - returns: A publisher containing the latest value from the second publisher, if any.
func withLatestFrom<Other: Publisher>(_ other: Other)
-> Publishers.WithLatestFrom<Self, Other, Other.Output> {
return .init(upstream: self, second: other) { $1 }
}
}
// MARK: - Publisher
extension Publishers {
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
public struct WithLatestFrom<Upstream: Publisher,
Other: Publisher,
Output>: Publisher where Upstream.Failure == Other.Failure {
public typealias Failure = Upstream.Failure
public typealias ResultSelector = (Upstream.Output, Other.Output) -> Output
private let upstream: Upstream
private let second: Other
private let resultSelector: ResultSelector
private var latestValue: Other.Output?
init(upstream: Upstream,
second: Other,
resultSelector: @escaping ResultSelector) {
self.upstream = upstream
self.second = second
self.resultSelector = resultSelector
}
public func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
let sub = Subscription(upstream: upstream,
second: second,
resultSelector: resultSelector,
subscriber: subscriber)
subscriber.receive(subscription: sub)
}
}
}
// MARK: - Subscription
@available(OSX 10.15, iOS 13.0, tvOS 13.0, watchOS 6.0, *)
extension Publishers.WithLatestFrom {
private class Subscription<S: Subscriber>: Combine.Subscription where S.Input == Output, S.Failure == Failure {
private let subscriber: S
private let resultSelector: ResultSelector
private var latestValue: Other.Output?
private let upstream: Upstream
private let second: Other
private var firstSubscription: Cancellable?
private var secondSubscription: Cancellable?
init(upstream: Upstream,
second: Other,
resultSelector: @escaping ResultSelector,
subscriber: S) {
self.upstream = upstream
self.second = second
self.subscriber = subscriber
self.resultSelector = resultSelector
trackLatestFromSecond()
}
func request(_ demand: Subscribers.Demand) {
// withLatestFrom always takes one latest value from the second
// observable, so demand doesn't really have a meaning here.
firstSubscription = upstream
.sink(
receiveCompletion: { [subscriber] in subscriber.receive(completion: $0) },
receiveValue: { [weak self] value in
guard let self = self else { return }
guard let latest = self.latestValue else { return }
_ = self.subscriber.receive(self.resultSelector(value, latest))
})
}
// Create an internal subscription to the `Other` publisher,
// constantly tracking its latest value
private func trackLatestFromSecond() {
let subscriber = AnySubscriber<Other.Output, Other.Failure>(
receiveSubscription: { [weak self] subscription in
self?.secondSubscription = subscription
subscription.request(.unlimited)
},
receiveValue: { [weak self] value in
self?.latestValue = value
return .unlimited
},
receiveCompletion: nil)
self.second.subscribe(subscriber)
}
func cancel() {
firstSubscription?.cancel()
secondSubscription?.cancel()
}
}
}
@freak4pc
Copy link
Author

freak4pc commented Aug 29, 2019

Example usage:

let subject1 = PassthroughSubject<String, Never>()
let subject2 = PassthroughSubject<String, Never>()

subject1
  .withLatestFrom(subject2) { ($0, $1) }
  .sink(receiveCompletion: { print($0) },
        receiveValue: { print($0) })
  .store(in: &subscriptions)

subject2.send("There")
subject1.send("Hey")
subject1.send("Is there anybody out")
subject2.send("using Combine?")
subject1.send("do you enjoy")
subject1.send(completion: .finished)

Prints out:

("Hey", "There")
("Is there anybody out", "There")
("do you enjoy", "using Combine?")
finished

@msewell
Copy link

msewell commented Nov 20, 2019

It seems to me like withLatestFrom always takes the first value of the second subscription?

Given this example code:

    let digits = [1, 2, 3]
        .publisher
        .delay(for: .seconds(1), scheduler: DispatchQueue.main)
        .print("digits")

    let letters = ["a", "b", "c"]
        .publisher
        .print("letters")

    var values: [(Int, String)] = []

    digits
        .withLatestFrom(letters) { ($0, $1) }
        .print("withLatestFrom")
        .sink(
            receiveCompletion: { _ in print("values:", values) },
            receiveValue: { values.append(($0, $1)) }
        )
        .store(in: &cancellables)

We get this log output:

letters: receive subscription: (["a", "b", "c"])
letters: request max: (1)
letters: receive value: (a)
withLatestFrom: receive subscription: ((extension in Spielwiese_Sources):Combine.Publishers.WithLatestFrom<Combine.Publishers.Print<Combine.Publishers.Delay<Combine.Publishers.Sequence<Swift.Array<Swift.Int>, Swift.Never>, __C.OS_dispatch_queue>>, Combine.Publishers.Print<Combine.Publishers.Sequence<Swift.Array<Swift.String>, Swift.Never>>, (Swift.Int, Swift.String)>.(unknown context at $10dfb7b10).Subscription<Combine.Publishers.Print<(extension in Spielwiese_Sources):Combine.Publishers.WithLatestFrom<Combine.Publishers.Print<Combine.Publishers.Delay<Combine.Publishers.Sequence<Swift.Array<Swift.Int>, Swift.Never>, __C.OS_dispatch_queue>>, Combine.Publishers.Print<Combine.Publishers.Sequence<Swift.Array<Swift.String>, Swift.Never>>, (Swift.Int, Swift.String)>>.(unknown context at $7fff233d5eb0).Inner<Combine.Subscribers.Sink<(Swift.Int, Swift.String), Swift.Never>>>)
withLatestFrom: request unlimited
digits: receive subscription: (Combine.Publishers.Delay<Combine.Publishers.Sequence<Swift.Array<Swift.Int>, Swift.Never>, __C.OS_dispatch_queue>.(unknown context at $7fff233e1210).Inner<Combine.Publishers.Print<Combine.Publishers.Delay<Combine.Publishers.Sequence<Swift.Array<Swift.Int>, Swift.Never>, __C.OS_dispatch_queue>>.(unknown context at $7fff233d5eb0).Inner<Combine.Subscribers.Sink<Swift.Int, Swift.Never>>>)
digits: request unlimited
digits: receive value: (1)
letters: receive subscription: (["a", "b", "c"])
letters: receive cancel
letters: request max: (1)
letters: receive value: (a)
withLatestFrom: receive value: ((1, "a"))
digits: receive value: (2)
letters: receive subscription: (["a", "b", "c"])
letters: receive cancel
letters: request max: (1)
letters: receive value: (a)
withLatestFrom: receive value: ((2, "a"))
digits: receive value: (3)
letters: receive subscription: (["a", "b", "c"])
letters: receive cancel
letters: request max: (1)
letters: receive value: (a)
withLatestFrom: receive value: ((3, "a"))
digits: receive finished
withLatestFrom: receive finished
digits: receive cancel
values: [(1, "a"), (2, "a"), (3, "a")]

I'd expect values to be [(1, "c"), (2, "c"), (3, "c")]. I suspect the issue might be that the letters subscription shouldn't get repeatedly canceled and re-subscribed to? Apologies in advance if this isn't an issue with withLatestFrom but instead my limited understanding of how this should work. :)

@freak4pc
Copy link
Author

Good catch - the cancellation is indeed the issue and also the lazy demand (e.g. max(1)). I've made a few adjustments, let me know if that helps :) @msewell

@msewell
Copy link

msewell commented Nov 20, 2019

@freak4pc That does seem to work better, yes! Nice work!

@mesheilah
Copy link

mesheilah commented Jan 28, 2020

I believe this is one of the most important operators missing in Combine.
However, in the trackLatestFromSecond private function, does it make a difference if we change the AnySubscriber usage to second.sink(receiveCompletion:receiveValue:) ?

@serbats
Copy link

serbats commented Feb 12, 2020

I'd suggest to improve this a bit with handling backpressure:

    func request(_ demand: Subscribers.Demand) {
        if demand == .unlimited {
            firstSubscription = upstream
            .sink(
                receiveCompletion: { [subscriber] in subscriber.receive(completion: $0) },
                receiveValue: { [weak self] value in
                    guard let self = self else { return }
                    
                    guard let latest = self.latestValue else { return }
                    _ = self.subscriber.receive(self.resultSelector(value, latest))
            })
        } else if demand > .none {
            let subscriber = AnySubscriber<Upstream.Output, Upstream.Failure>(receiveSubscription: { [weak self] subscription in
                self?.firstSubscription = subscription
                subscription.request(demand)
                },
                                                                              receiveValue: { [weak self] value in
                                                                                guard let self = self else { return .max(1) }
                                                                                
                                                                                guard let latest = self.latestValue else { return .max(1) }
                                                                                
                                                                                return self.subscriber.receive(self.resultSelector(value, latest))
                },
                                                                              receiveCompletion: { [weak self] in self?.subscriber.receive(completion: $0) })
            
            self.upstream.subscribe(subscriber)
        }
    }

Also trackLatestFromSecond could be simplified:

private func trackLatestFromSecond() {
        self.secondSubscription = self.second
            .sink(receiveCompletion: {_ in },
                  receiveValue: { [weak self] value in
                 self?.latestValue = value
            })
    }

Happy that found this. Really most missing operator.

@freak4pc
Copy link
Author

freak4pc commented Feb 12, 2020

What is the purpose of handling back pressure for an operator that can only handle/use .max(1) ? Seems pointless

The only thing you might want to support is .none, perhaps

@serbats
Copy link

serbats commented Feb 12, 2020

I'm new to Combine and could be mistaken but I think this is how Apple recommends doing.
Synchronise demand for upstream subscriber and our main subscriber is main idea here.

I don't see good examples over the internet how to do that correctly. Probably you are right that this handling is not correct or not really useful.
Also haven't found any operators for materialize/dematerialize. Will try to write them by myself. really miss good examples over the internet

@freak4pc
Copy link
Author

Yeah there aren't a lot of good examples :(
Basically if your operator can produce a lot of elements, definitely you must support backpressure, but in this case you always want one and always a specific one (with latest), so back pressure support would just hinder the capabilities of this operator

@keylook
Copy link

keylook commented Feb 27, 2020

This is great. Spent too many hours trying to figure the way around Combine to have this behavior. The question is can this gist be used? What license is it?

@freak4pc
Copy link
Author

As far as I'm concerned it's MIT. Feel free to use it.
It will be part of CombineCommunity/CombineExt, soon

@keylook
Copy link

keylook commented Feb 27, 2020

Got it, thanks.

@pawlowskialex
Copy link

Here's a version composed out of built-in operators instead of rebuilding the publisher:

import Combine
import Dispatch

extension Publishers {
    struct WithLatestFrom<Upstream: Publisher, Side: Publisher, Output>: Publisher where Upstream.Failure == Side.Failure {
        typealias Failure = Side.Failure
        typealias ResultSelector = (Upstream.Output, Side.Output) -> Output

        let upstream: Upstream
        let side: Side
        let resultSelector: ResultSelector
        
        func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
            let timestampedUpstream = upstream.map(TimestampedValue.init)
            let timestampedSide = side.map(TimestampedValue.init)
            let state = timestampedUpstream.combineLatest(timestampedSide, State.init)
            let mappedValues = state
                .filter { $0.upstream.time >= $0.side.time }
                .map { resultSelector($0.upstream.value, $0.side.value) }
            
            mappedValues.receive(subscriber: subscriber)
        }
        
        private struct TimestampedValue<T> {
            let value: T
            let time: DispatchTime
            
            init(value: T) {
                self.value = value
                self.time = DispatchTime.now()
            }
        }
        
        private struct State {
            let upstream: TimestampedValue<Upstream.Output>
            let side: TimestampedValue<Side.Output>
        }
    }
}

extension Publisher {
    func withLatestFrom<P: Publisher, R>(_ publisher: P,
                                         resultSelector: @escaping (Output, P.Output) -> R) -> Publishers.WithLatestFrom<Self, P, R> {
        Publishers.WithLatestFrom(upstream: self, side: publisher, resultSelector: resultSelector)
    }

    func withLatestFrom<P: Publisher>(_ publisher: P) -> Publishers.WithLatestFrom<Self, P, P.Output> {
        withLatestFrom(publisher, resultSelector: { $1 })
    }
}

@freak4pc
Copy link
Author

freak4pc commented Aug 6, 2021

Thanks @pawlowskialex.
If you're going down that path, here's a neatly composed variation @jasdev wrote:

extension Publisher {
  func withLatestFrom<Other: Publisher, Result>(_ other: Other,
                                                    resultSelector: @escaping (Output, Other.Output) -> Result)
      -> AnyPublisher<Result, Failure>
      where Other.Failure == Failure {
          let upstream = share()
  
          return other
              .map { second in upstream.map { resultSelector($0, second) } }
              .switchToLatest()
              .zip(upstream) // `zip`ping and discarding `\.1` allows for
                                        // upstream completions to be projected down immediately.
              .map(\.0)
              .eraseToAnyPublisher()
      }
}

And a blot post he wrote about this topic: https://jasdev.me/notes/with-latest-from

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment