Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@jasdev
Last active April 20, 2020 15:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jasdev/430276e0484cb66cf6d35385c5a7faff to your computer and use it in GitHub Desktop.
Save jasdev/430276e0484cb66cf6d35385c5a7faff to your computer and use it in GitHub Desktop.
`ReplaySubject` with `.send(_:)`.
final class ReplaySubject<Output, Failure: Error>: Subject {
typealias Output = Output
typealias Failure = Failure
private let bufferSize: Int
private var completion: Subscribers.Completion<Failure>?
private var isActive: Bool {
completion == nil
}
private var buffer = [Output]() /// (1) We could swap `Array` for a type better suited for head removals and tail
//// insertions—à la [Entwine’s `ReplaySubjectValueBuffer`](https://github.com/tcldr/Entwine/blob/b839c9fcc7466878d6a823677ce608da998b95b9/Sources/Entwine/Operators/ReplaySubject.swift#L162-L177)
/// —but, for now, this can do.
private var subscriptions = [Subscription]()
init(bufferSize: Int) {
self.bufferSize = bufferSize
buffer.reserveCapacity(bufferSize) /// (2) Priming `buffer`’s capacity to avoid intermediary resizings.
}
func send(subscription: Combine.Subscription) {
subscription.request(.unlimited)
}
func send(completion: Subscribers.Completion<Failure>) {
guard isActive else { return }
self.completion = completion
subscriptions.forEach { $0.forward(completion: completion) }
}
func send(_ value: Output) {
guard isActive else { return }
buffer.append(value)
if buffer.count > bufferSize { /// (3) Dequeue the first buffered value, if we’ve hit the max size.
buffer.removeFirst()
}
subscriptions.forEach { $0.forward(value: value) }
}
func receive<Subscriber: Combine.Subscriber>(
subscriber: Subscriber
) where Failure == Subscriber.Failure, Output == Subscriber.Input { /* … */ }
}
extension ReplaySubject {
final class Subscription: Combine.Subscription {
func request(_ demand: Subscribers.Demand) { /* … */ }
func cancel() { /* … */ }
func forward(completion: Subscribers.Completion<Failure>) { /* … */ }
func forward(value: Output) { /// (4) To be filled in, soon.
/* … */
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment