Created
April 3, 2021 20:16
-
-
Save andreacipriani/6e2e93d349122f2e80443341c55dacb7 to your computer and use it in GitHub Desktop.
Combine async publisher problem
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import UIKit | |
import Combine | |
let queue = DispatchQueue(label: "bg", qos: .background) | |
let asyncOpQueue = DispatchQueue(label: "async-op-queue") | |
var cancellable: AnyCancellable? | |
func example() { | |
let source = PassthroughSubject<Bool, Never>() | |
cancellable = source | |
.receive(on: queue) | |
.map { isOn in AnyPublisher<String, Never>(asyncOp(isOn)) } | |
.switchToLatest() | |
.print() | |
.sink(receiveValue: { _ in | |
group.leave() | |
}) | |
source.send(true) | |
source.send(false) | |
} | |
func asyncOp(_ isOn: Bool) -> (AnyPublisher<String, Never>.Subscriber) -> AnyCancellable { | |
{ subscriber in | |
asyncOpQueue.async { | |
subscriber.send(isOn ? "Hello" : "Goodby") | |
subscriber.send(completion: .finished) | |
} | |
return AnyCancellable { } | |
} | |
} | |
extension AnyPublisher where Failure: Error { | |
struct Subscriber { | |
fileprivate let send: (Output) -> Void | |
fileprivate let complete: (Subscribers.Completion<Failure>) -> Void | |
func send(_ value: Output) { self.send(value) } | |
func send(completion: Subscribers.Completion<Failure>) { self.complete(completion) } | |
} | |
init(_ closure: (Subscriber) -> AnyCancellable) { | |
let subject = PassthroughSubject<Output, Failure>() | |
let subscriber = Subscriber( | |
send: subject.send, | |
complete: subject.send(completion:) | |
) | |
let cancel = closure(subscriber) | |
self = subject | |
.handleEvents(receiveCancel: cancel.cancel) | |
.eraseToAnyPublisher() | |
} | |
} | |
let group = DispatchGroup() | |
group.enter() | |
example() | |
group.wait() | |
// Error: expected "Goodby", but the code doesn't emit an event... | |
/// The problem is to create a publisher that maps a simple publisher of `Bool` to `String`s | |
/// Simulating that the mapping operation can be asynchronous and take time | |
/// While this async operation is performed, if a new boolean is published from the outer publisher | |
/// The inner publisher should cancel the previous subscription and only consider the latest (this is guaranted by the `switchToLatest` operator) | |
/// Example: if we send true and then immediately false, we expect to emit "Goodby" and cancel the emission of "Hello" | |
/// If you use two different queues, like in this example, the event gets lost | |
/// If you use the same queue, the event is recieved |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment