Skip to content

Instantly share code, notes, and snippets.

@jasdev
Last active April 20, 2020 16:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jasdev/ac950c95e474e78d0bf850b9b612fa3c to your computer and use it in GitHub Desktop.
Save jasdev/ac950c95e474e78d0bf850b9b612fa3c to your computer and use it in GitHub Desktop.
Final `ReplaySubject` and `.Subscription` cancellation handling.
final class ReplaySubject<Output, Failure: Error>: Subject {
typealias Output = Output
typealias Failure = Failure
private let bufferSize: Int
private var completion: Subscribers.Completion<Failure>?
private var isActive: Bool { /* … */ }
private var buffer = [Output]()
private var subscriptions = [Subscription<AnySubscriber<Output, Failure>>]()
init(bufferSize: Int) { /* … */ }
func send(subscription: Combine.Subscription) { /* … */ }
func send(completion: Subscribers.Completion<Failure>) { /* … */ }
func send(_ value: Output) { /* … */ }
func receive<Subscriber: Combine.Subscriber>(
subscriber: Subscriber
) where Failure == Subscriber.Failure, Output == Subscriber.Input {
guard !subscriptions.contains(
where: { $0.innerSubscriberIdentifier == subscriber.combineIdentifier }
) else {
subscriber.receive(subscription: Subscriptions.empty)
return
}
let subscription = Subscription(subscriber: AnySubscriber(subscriber)) { [weak self] in
guard
let self = self,
let subscriptionIndex = self.subscriptions
.firstIndex(where: { $0.innerSubscriberIdentifier == subscriber.combineIdentifier })
else { return }
self.subscriptions.remove(at: subscriptionIndex) /// (2) The `cancellationHandler` should first
/// promote a `weak`ly-captured `self`, check if our subscriber is in `subscriptions` and if so, remove it.
}
subscriptions.append(subscription)
subscriber.receive(subscription: subscription)
buffer.forEach(subscription.forward)
if let completion = completion { subscription.forward(completion: completion) }
}
}
extension ReplaySubject {
final class Subscription<Subscriber: Combine.Subscriber>: Combine.Subscription
where Subscriber.Input == Output, Subscriber.Failure == Failure {
private var buffer: DemandBuffer<Subscriber>?
private var cancellationHandler: (() -> Void)?
fileprivate let innerSubscriberIdentifier: CombineIdentifier
init(
subscriber: Subscriber,
cancellationHandler: (() -> Void)? /// (1) I always wondered why _captured_, optional closures
/// didn’t need to be tagged `@escaping`—well, it turns out that since `Optional.some` tucks it away
/// in an associated value, it’s implicitly escaping. Jesse Squires has [a post with more details](https://www.jessesquires.com/blog/2018/06/10/why-optional-swift-closures-are-escaping/).
) {
buffer = DemandBuffer(subscriber: subscriber)
innerSubscriberIdentifier = subscriber.combineIdentifier
self.cancellationHandler = cancellationHandler
}
func request(_ demand: Subscribers.Demand) { /* … */ }
func cancel() {
cancellationHandler?()
cancellationHandler = nil
buffer = nil
}
func forward(completion: Subscribers.Completion<Failure>) { /* … */ }
func forward(value: Output) { /* … */ }
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment