Skip to content

Instantly share code, notes, and snippets.

@andreacipriani
Created April 3, 2021 20:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save andreacipriani/6e2e93d349122f2e80443341c55dacb7 to your computer and use it in GitHub Desktop.
Save andreacipriani/6e2e93d349122f2e80443341c55dacb7 to your computer and use it in GitHub Desktop.
Combine async publisher problem
import UIKit
import Combine
let queue = DispatchQueue(label: "bg", qos: .background)
let asyncOpQueue = DispatchQueue(label: "async-op-queue")
var cancellable: AnyCancellable?
func example() {
let source = PassthroughSubject<Bool, Never>()
cancellable = source
.receive(on: queue)
.map { isOn in AnyPublisher<String, Never>(asyncOp(isOn)) }
.switchToLatest()
.print()
.sink(receiveValue: { _ in
group.leave()
})
source.send(true)
source.send(false)
}
func asyncOp(_ isOn: Bool) -> (AnyPublisher<String, Never>.Subscriber) -> AnyCancellable {
{ subscriber in
asyncOpQueue.async {
subscriber.send(isOn ? "Hello" : "Goodby")
subscriber.send(completion: .finished)
}
return AnyCancellable { }
}
}
extension AnyPublisher where Failure: Error {
struct Subscriber {
fileprivate let send: (Output) -> Void
fileprivate let complete: (Subscribers.Completion<Failure>) -> Void
func send(_ value: Output) { self.send(value) }
func send(completion: Subscribers.Completion<Failure>) { self.complete(completion) }
}
init(_ closure: (Subscriber) -> AnyCancellable) {
let subject = PassthroughSubject<Output, Failure>()
let subscriber = Subscriber(
send: subject.send,
complete: subject.send(completion:)
)
let cancel = closure(subscriber)
self = subject
.handleEvents(receiveCancel: cancel.cancel)
.eraseToAnyPublisher()
}
}
let group = DispatchGroup()
group.enter()
example()
group.wait()
// Error: expected "Goodby", but the code doesn't emit an event...
/// The problem is to create a publisher that maps a simple publisher of `Bool` to `String`s
/// Simulating that the mapping operation can be asynchronous and take time
/// While this async operation is performed, if a new boolean is published from the outer publisher
/// The inner publisher should cancel the previous subscription and only consider the latest (this is guaranted by the `switchToLatest` operator)
/// Example: if we send true and then immediately false, we expect to emit "Goodby" and cancel the emission of "Hello"
/// If you use two different queues, like in this example, the event gets lost
/// If you use the same queue, the event is recieved
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment