Skip to content

Instantly share code, notes, and snippets.

@mattmassicotte
Last active May 21, 2022 15:12
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattmassicotte/13531168d473ce0dc859b7c87afb6569 to your computer and use it in GitHub Desktop.
Save mattmassicotte/13531168d473ce0dc859b7c87afb6569 to your computer and use it in GitHub Desktop.
This code demonstrates that using a shared publisher will not always be able to deliver results to concurrent subscribers
import Foundation
import Combine
enum State: Int {
case starting
case firstValueReceived
case secondSubscriptionCreated
case finishedWithFirstValue
case secondValueReceived
case finished
}
extension NSConditionLock {
convenience init<T: RawRepresentable>(condition: T) where T.RawValue == Int {
self.init(condition: condition.rawValue)
}
func lock<T: RawRepresentable>(when: T) where T.RawValue == Int {
lock(whenCondition: when.rawValue)
}
func unlock<T: RawRepresentable>(with: T) where T.RawValue == Int {
unlock(withCondition: with.rawValue)
}
}
let queue = DispatchQueue(label: "Serial Queue")
var sharedPublisher: AnyPublisher<Int, Never>? = nil
var cancellables = Set<AnyCancellable>()
let lock = NSConditionLock(condition: State.starting)
func makeAsyncPublisher() -> AnyPublisher<Int, Never> {
return Deferred {
Future<Int, Never> { promise in
DispatchQueue.global().asyncAfter(deadline: .now() + 1) {
promise(.success(42))
}
}
}.eraseToAnyPublisher()
}
func getOrCreateSharedPublisher() -> AnyPublisher<Int, Never> {
if let pub = sharedPublisher {
return pub
}
let pub = makeAsyncPublisher()
.handleEvents(receiveCompletion: { _ in
print("shared publisher completed")
})
.share()
.eraseToAnyPublisher()
sharedPublisher = pub
return pub
}
func useSharedPublisherWithReceives() -> AnyPublisher<Int, Never> {
return Just(1)
.receive(on: queue)
.flatMap { value -> AnyPublisher<Int, Never> in
return getOrCreateSharedPublisher()
}
.eraseToAnyPublisher()
}
func useSharedPublisherWithSync() -> AnyPublisher<Int, Never> {
return queue.sync {
return getOrCreateSharedPublisher()
}
}
func useSharedPublisher() -> AnyPublisher<Int, Never> {
return useSharedPublisherWithReceives()
}
DispatchQueue.global().async {
// thread one makes first request
useSharedPublisher()
.handleEvents(receiveCompletion: { _ in
print("1: completed")
})
.sink { value in
print("1: starting")
lock.lock(when: State.starting)
print("1: got \(value)")
lock.unlock(with: State.firstValueReceived)
print("1: waiting for 2nd subscription")
lock.lock(when: State.secondSubscriptionCreated)
print("1: found 2nd subscrption")
lock.unlock(with: State.finishedWithFirstValue)
}
.store(in: &cancellables)
}
DispatchQueue.global().async {
// thread two makes a request after the result,
// but *before* the completion
lock.lock(when: State.firstValueReceived)
useSharedPublisher()
.handleEvents(receiveSubscription: { _ in
print("2: created subscription")
lock.unlock(with: State.secondSubscriptionCreated)
}, receiveCompletion: { _ in
print("2: completed")
})
.sink { value in
print("2: waiting")
lock.lock(when: State.finishedWithFirstValue)
print("2: got \(value)")
lock.unlock(with: State.secondValueReceived)
}
.store(in: &cancellables)
}
print("waiting for second value")
lock.lock(when: State.secondValueReceived)
print("got both values")
lock.unlock(with: State.finished)
@mattmassicotte
Copy link
Author

Here's the output.

waiting for second value
1: starting
1: got 42
2: created subscription
1: waiting for 2nd subscription
1: found 2nd subscrption
shared publisher completed
2: completed
1: completed

Subscriber 2 will never get a result, just a completion, causing the code to hang.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment