|
import Foundation |
|
import Combine |
|
|
|
enum State: Int { |
|
case starting |
|
case firstValueReceived |
|
case secondSubscriptionCreated |
|
case finishedWithFirstValue |
|
case secondValueReceived |
|
case finished |
|
} |
|
|
|
extension NSConditionLock { |
|
convenience init<T: RawRepresentable>(condition: T) where T.RawValue == Int { |
|
self.init(condition: condition.rawValue) |
|
} |
|
|
|
func lock<T: RawRepresentable>(when: T) where T.RawValue == Int { |
|
lock(whenCondition: when.rawValue) |
|
} |
|
|
|
func unlock<T: RawRepresentable>(with: T) where T.RawValue == Int { |
|
unlock(withCondition: with.rawValue) |
|
} |
|
} |
|
|
|
let queue = DispatchQueue(label: "Serial Queue") |
|
var sharedPublisher: AnyPublisher<Int, Never>? = nil |
|
var cancellables = Set<AnyCancellable>() |
|
|
|
let lock = NSConditionLock(condition: State.starting) |
|
|
|
func makeAsyncPublisher() -> AnyPublisher<Int, Never> { |
|
return Deferred { |
|
Future<Int, Never> { promise in |
|
DispatchQueue.global().asyncAfter(deadline: .now() + 1) { |
|
promise(.success(42)) |
|
} |
|
} |
|
}.eraseToAnyPublisher() |
|
} |
|
|
|
func getOrCreateSharedPublisher() -> AnyPublisher<Int, Never> { |
|
if let pub = sharedPublisher { |
|
return pub |
|
} |
|
|
|
let pub = makeAsyncPublisher() |
|
.handleEvents(receiveCompletion: { _ in |
|
print("shared publisher completed") |
|
}) |
|
.share() |
|
.eraseToAnyPublisher() |
|
|
|
sharedPublisher = pub |
|
|
|
return pub |
|
} |
|
|
|
func useSharedPublisherWithReceives() -> AnyPublisher<Int, Never> { |
|
return Just(1) |
|
.receive(on: queue) |
|
.flatMap { value -> AnyPublisher<Int, Never> in |
|
return getOrCreateSharedPublisher() |
|
} |
|
.eraseToAnyPublisher() |
|
} |
|
|
|
func useSharedPublisherWithSync() -> AnyPublisher<Int, Never> { |
|
return queue.sync { |
|
return getOrCreateSharedPublisher() |
|
} |
|
} |
|
|
|
func useSharedPublisher() -> AnyPublisher<Int, Never> { |
|
return useSharedPublisherWithReceives() |
|
} |
|
|
|
DispatchQueue.global().async { |
|
// thread one makes first request |
|
useSharedPublisher() |
|
.handleEvents(receiveCompletion: { _ in |
|
print("1: completed") |
|
}) |
|
.sink { value in |
|
print("1: starting") |
|
lock.lock(when: State.starting) |
|
print("1: got \(value)") |
|
lock.unlock(with: State.firstValueReceived) |
|
|
|
print("1: waiting for 2nd subscription") |
|
lock.lock(when: State.secondSubscriptionCreated) |
|
|
|
print("1: found 2nd subscrption") |
|
lock.unlock(with: State.finishedWithFirstValue) |
|
} |
|
.store(in: &cancellables) |
|
} |
|
|
|
DispatchQueue.global().async { |
|
// thread two makes a request after the result, |
|
// but *before* the completion |
|
|
|
lock.lock(when: State.firstValueReceived) |
|
|
|
useSharedPublisher() |
|
.handleEvents(receiveSubscription: { _ in |
|
print("2: created subscription") |
|
lock.unlock(with: State.secondSubscriptionCreated) |
|
}, receiveCompletion: { _ in |
|
print("2: completed") |
|
}) |
|
.sink { value in |
|
print("2: waiting") |
|
lock.lock(when: State.finishedWithFirstValue) |
|
print("2: got \(value)") |
|
lock.unlock(with: State.secondValueReceived) |
|
} |
|
.store(in: &cancellables) |
|
} |
|
|
|
print("waiting for second value") |
|
lock.lock(when: State.secondValueReceived) |
|
|
|
print("got both values") |
|
lock.unlock(with: State.finished) |
Here's the output.
Subscriber 2 will never get a result, just a completion, causing the code to hang.