Skip to content

Instantly share code, notes, and snippets.

@slightair
Last active January 17, 2020 05:47
Show Gist options
  • Save slightair/3a2c02a990c3b176fa2feadcd26e6e15 to your computer and use it in GitHub Desktop.
Save slightair/3a2c02a990c3b176fa2feadcd26e6e15 to your computer and use it in GitHub Desktop.
Apple Combine + withLatestFrom operator
let just = Just<Int>(999)
let a = [1, 2, 3, 4, 5].publisher
.withLatestFrom(just)
.sink(receiveCompletion: {
print($0)
}, receiveValue: {
print($0)
})
print()
let x = PassthroughSubject<Int, Never>()
let y = PassthroughSubject<String, Never>()
let b = x.withLatestFrom(y)
.sink(receiveCompletion: {
print($0)
}, receiveValue: {
print($0)
})
x.send(100)
y.send("A")
y.send("B")
x.send(200)
x.send(300)
y.send("C")
x.send(400)
x.send(completion: .finished)
import Combine
extension Publishers {
struct WithLatestFrom<Upstream: Publisher, P: Publisher, Output>: Publisher {
typealias Failure = Error
typealias ResultSelector = (Upstream.Output, P.Output) throws -> Output
let upstream: Upstream
let publisher: P
let resultSelector: ResultSelector
init(upstream: Upstream, publisher: P, resultSelector: @escaping ResultSelector) {
self.upstream = upstream
self.publisher = publisher
self.resultSelector = resultSelector
}
func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
let subscription = WithLatestFromSubscription(subscriber: subscriber, upstream: upstream, publisher: publisher, resultSelector: resultSelector)
subscriber.receive(subscription: subscription)
}
private final class WithLatestFromSubscription<SubscriberType: Subscriber>: Subscription where SubscriberType.Input == Output, SubscriberType.Failure == Failure {
let combineIdentifier = CombineIdentifier()
let upstream: Upstream
let publisher: P
let resultSelector: ResultSelector
var subscriber: SubscriberType?
var upstreamSubscription: AnyCancellable?
var publisherSubscription: AnyCancellable?
var latest: P.Output?
init(subscriber: SubscriberType, upstream: Upstream, publisher: P, resultSelector: @escaping ResultSelector) {
self.subscriber = subscriber
self.upstream = upstream
self.publisher = publisher
self.resultSelector = resultSelector
}
func request(_ demand: Subscribers.Demand) {
publisherSubscription = publisher.sink(
receiveCompletion: { completion in
switch completion {
case .finished:
break
case let .failure(error):
self.subscriber?.receive(completion: .failure(error))
self.cancel()
}
},
receiveValue: { output in
self.latest = output
})
upstreamSubscription = upstream.sink(
receiveCompletion: { completion in
switch completion {
case .finished:
self.subscriber?.receive(completion: .finished)
self.cancel()
case let .failure(error):
self.subscriber?.receive(completion: .failure(error))
self.cancel()
}
},
receiveValue: { output in
guard let latest = self.latest else { return }
do {
let result = try self.resultSelector(output, latest)
self.subscriber?.receive(result)
} catch let error {
self.subscriber?.receive(completion: .failure(error))
self.cancel()
}
}
)
}
func cancel() {
publisherSubscription = nil
upstreamSubscription = nil
subscriber = nil
}
}
}
}
extension Publisher {
func withLatestFrom<P: Publisher, ResultType>(_ publisher: P, resultSelector: @escaping (Output, P.Output) throws -> ResultType) -> Publishers.WithLatestFrom<Self, P, ResultType> {
Publishers.WithLatestFrom(upstream: self, publisher: publisher, resultSelector: resultSelector)
}
func withLatestFrom<P: Publisher>(_ publisher: P) -> Publishers.WithLatestFrom<Self, P, P.Output> {
withLatestFrom(publisher, resultSelector: { $1 })
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment