Skip to content

Instantly share code, notes, and snippets.

@jasdev
Last active April 20, 2020 16:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jasdev/bc66d03e53f1e86b4a8197dc087aea5e to your computer and use it in GitHub Desktop.
Save jasdev/bc66d03e53f1e86b4a8197dc087aea5e to your computer and use it in GitHub Desktop.
Implemented `ReplaySubject`.
import Combine
/// Also, Assuming a `CombineExt.DemandBuffer` import. At the time of writing, the type is `internal`.
final class ReplaySubject<Output, Failure: Error>: Subject {
typealias Output = Output
typealias Failure = Failure
private let bufferSize: Int
private var completion: Subscribers.Completion<Failure>?
private var isActive: Bool {
completion == nil
}
private var buffer = [Output]()
private var subscriptions = [Subscription<AnySubscriber<Output, Failure>>]()
init(bufferSize: Int) {
self.bufferSize = bufferSize
buffer.reserveCapacity(bufferSize)
}
func send(subscription: Combine.Subscription) {
subscription.request(.unlimited)
}
func send(completion: Subscribers.Completion<Failure>) {
guard isActive else { return }
self.completion = completion
subscriptions.forEach { $0.forward(completion: completion) }
}
func send(_ value: Output) {
guard isActive else { return }
buffer.append(value)
if buffer.count > bufferSize {
buffer.removeFirst()
}
subscriptions.forEach { $0.forward(value: value) }
}
func receive<Subscriber: Combine.Subscriber>(
subscriber: Subscriber
) where Failure == Subscriber.Failure, Output == Subscriber.Input {
guard !subscriptions.contains(
where: { $0.innerSubscriberIdentifier == subscriber.combineIdentifier }
) else {
subscriber.receive(subscription: Subscriptions.empty)
return
}
let subscription = Subscription(subscriber: AnySubscriber(subscriber)) { [weak self] in
guard
let self = self,
let subscriptionIndex = self.subscriptions
.firstIndex(where: { $0.innerSubscriberIdentifier == subscriber.combineIdentifier })
else { return }
self.subscriptions.remove(at: subscriptionIndex)
}
subscriptions.append(subscription)
subscriber.receive(subscription: subscription)
buffer.forEach(subscription.forward)
if let completion = completion { subscription.forward(completion: completion) }
}
}
extension ReplaySubject {
final class Subscription<Subscriber: Combine.Subscriber>: Combine.Subscription
where Subscriber.Input == Output, Subscriber.Failure == Failure {
private var buffer: DemandBuffer<Subscriber>?
private var cancellationHandler: (() -> Void)?
fileprivate let innerSubscriberIdentifier: CombineIdentifier
init(
subscriber: Subscriber,
cancellationHandler: (() -> Void)?
) {
buffer = DemandBuffer(subscriber: subscriber)
innerSubscriberIdentifier = subscriber.combineIdentifier
self.cancellationHandler = cancellationHandler
}
func request(_ demand: Subscribers.Demand) {
_ = buffer?.demand(demand)
}
func cancel() {
cancellationHandler?()
cancellationHandler = nil
buffer = nil
}
func forward(completion: Subscribers.Completion<Failure>) {
buffer?.complete(completion: completion)
}
func forward(value: Output) {
_ = buffer?.buffer(value: value)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment