|
import Foundation |
|
import Combine |
|
|
|
// shareWhileConnected using Combine Proof of Concept |
|
extension Publisher { |
|
func shareWhileConnected() -> Publishers.ShareWhileConnected<Self, PassthroughSubject<Output, Failure>> { |
|
.init(upstream: self) |
|
} |
|
} |
|
|
|
extension Publishers { |
|
final class ShareWhileConnected< |
|
Upstream: Publisher, MulticastSubject: Subject |
|
>: Publisher where MulticastSubject.Output == Upstream.Output, MulticastSubject.Failure == Upstream.Failure { |
|
typealias Output = Upstream.Output |
|
typealias Failure = Upstream.Failure |
|
|
|
private let subjectFactory: () -> MulticastSubject |
|
private let lock = NSRecursiveLock() |
|
|
|
private var subscriptions: Set<CombineIdentifier> = [] { |
|
didSet { |
|
guard self.subscriptions.isEmpty else { return } |
|
self.sharedPublisher = self.makeSharedPublisher() |
|
} |
|
} |
|
|
|
let upstream: Upstream |
|
|
|
private lazy var sharedPublisher = self.makeSharedPublisher() |
|
|
|
init( |
|
upstream: Upstream, |
|
subjectFactory: @escaping () -> MulticastSubject = { PassthroughSubject<Output, Failure>() } |
|
) { |
|
self.upstream = upstream |
|
self.subjectFactory = subjectFactory |
|
} |
|
|
|
func receive<S: Subscriber>(subscriber: S) where S.Input == Output, S.Failure == Failure { |
|
do { |
|
self.lock.lock(); defer { self.lock.unlock() } |
|
self.subscriptions.insert(subscriber.combineIdentifier) |
|
} |
|
self.sharedPublisher |
|
.handleEvents(receiveCancel: { [weak self] in |
|
self?.subscriptions.remove(subscriber.combineIdentifier) |
|
}) |
|
.receive(subscriber: subscriber) |
|
} |
|
|
|
private func makeSharedPublisher() -> AnyPublisher<Output, Failure> { |
|
self.upstream |
|
.multicast(subject: self.subjectFactory()) |
|
.autoconnect() |
|
.handleEvents(receiveCompletion: { [weak self] _ in |
|
guard let ss = self else { return } |
|
ss.lock.lock(); defer { ss.lock.unlock() } |
|
ss.subscriptions = [] |
|
}) |
|
.eraseToAnyPublisher() |
|
} |
|
} |
|
} |