Skip to content

Instantly share code, notes, and snippets.

@wotjd
Last active December 27, 2022 13:15
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save wotjd/d08493d41d9c5be44ec27a5f5d96f383 to your computer and use it in GitHub Desktop.
Save wotjd/d08493d41d9c5be44ec27a5f5d96f383 to your computer and use it in GitHub Desktop.
import Foundation
import Combine
// shareWhileConnected using Combine Proof of Concept
extension Publisher {
func shareWhileConnected() -> Publishers.ShareWhileConnected<Self, PassthroughSubject<Output, Failure>> {
.init(upstream: self)
}
}
extension Publishers {
final class ShareWhileConnected<
Upstream: Publisher, MulticastSubject: Subject
>: Publisher where MulticastSubject.Output == Upstream.Output, MulticastSubject.Failure == Upstream.Failure {
typealias Output = Upstream.Output
typealias Failure = Upstream.Failure
private let subjectFactory: () -> MulticastSubject
private let lock = NSRecursiveLock()
private var subscriptions: Set<CombineIdentifier> = [] {
didSet {
guard self.subscriptions.isEmpty else { return }
self.sharedPublisher = self.makeSharedPublisher()
}
}
let upstream: Upstream
private lazy var sharedPublisher = self.makeSharedPublisher()
init(
upstream: Upstream,
subjectFactory: @escaping () -> MulticastSubject = { PassthroughSubject<Output, Failure>() }
) {
self.upstream = upstream
self.subjectFactory = subjectFactory
}
func receive<S: Subscriber>(subscriber: S) where S.Input == Output, S.Failure == Failure {
do {
self.lock.lock(); defer { self.lock.unlock() }
self.subscriptions.insert(subscriber.combineIdentifier)
}
self.sharedPublisher
.handleEvents(receiveCancel: { [weak self] in
self?.subscriptions.remove(subscriber.combineIdentifier)
})
.receive(subscriber: subscriber)
}
private func makeSharedPublisher() -> AnyPublisher<Output, Failure> {
self.upstream
.multicast(subject: self.subjectFactory())
.autoconnect()
.handleEvents(receiveCompletion: { [weak self] _ in
guard let ss = self else { return }
ss.lock.lock(); defer { ss.lock.unlock() }
ss.subscriptions = []
})
.eraseToAnyPublisher()
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment