Skip to content

Instantly share code, notes, and snippets.

@vlondon
Forked from groue/CancelBag.swift
Created October 19, 2022 08:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vlondon/7eab85a087d6f0d5df69e0a6258315be to your computer and use it in GitHub Desktop.
Save vlondon/7eab85a087d6f0d5df69e0a6258315be to your computer and use it in GitHub Desktop.
CancelBag
import Combine
import Foundation
/// A thread-safe store for cancellables which addresses usability pain points
/// with stock Combine apis.
///
/// ## Thread-safe storage of cancellables
///
/// let cancelBag = CancelBag()
/// cancellable.store(in: cancelBag)
/// cancelBag.remove(cancellable)
///
/// ## Memory consumption
///
/// Use case: keep a cancellable alive until the cancelBag is drained, but
/// remove them from memory once the subscription completes or is cancelled.
///
/// // Releases memory when subscription completes or is cancelled.
/// publisher.sink(
/// in: cancelBag,
/// receiveCompletion: ...
/// receiveValue: ...)
///
/// // Manual cancellation is still possible
/// let cancellable = publisher.sink(
/// in: cancelBag,
/// receiveCompletion: ...
/// receiveValue: ...)
/// cancellable.cancel()
///
/// ## Important
///
/// CancelBag cancels its cancellables when it is deinitialized.
final class CancelBag {
var isEmpty: Bool { synchronized { cancellables.isEmpty } }
private var lock = NSRecursiveLock() // Allow reentrancy
private var cancellables: [AnyCancellable] = []
private var isCancelling = false
deinit {
cancel()
}
func remove(_ cancellable: AnyCancellable) {
synchronized {
if let index = cancellables.firstIndex(where: { $0 === cancellable }) {
cancellables.remove(at: index)
}
}
}
fileprivate func store<T: Cancellable>(_ cancellable: T) {
synchronized {
if let any = cancellable as? AnyCancellable {
// Don't lose cancellable identity, so that we can remove it.
cancellables.append(any)
} else {
cancellable.store(in: &cancellables)
}
}
}
private func synchronized<T>(_ execute: () throws -> T) rethrows -> T {
lock.lock()
defer { lock.unlock() }
return try execute()
}
}
extension CancelBag: Cancellable {
func cancel() {
synchronized {
// Avoid exclusive access violation: each cancellable may trigger a
// call to remove(_:), and mutate self.cancellables
let cancellables = self.cancellables
for cancellable in cancellables {
cancellable.cancel()
}
// OK, they are all cancelled now
self.cancellables = []
}
}
}
extension Cancellable {
func store(in bag: CancelBag) {
bag.store(self)
}
}
extension Publisher {
/// Attaches a subscriber with closure-based behavior.
///
/// This method creates the subscriber and immediately requests an unlimited
/// number of values.
///
/// The returned cancellable is added to cancelBag, and removed when
/// publisher completes.
///
/// - parameter cancelBag: A CancelBag instance.
/// - parameter receiveComplete: The closure to execute on completion.
/// - parameter receiveValue: The closure to execute on receipt of a value.
/// - returns: An AnyCancellable instance.
@discardableResult
func sink(
in cancelBag: CancelBag,
receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void,
receiveValue: @escaping (Output) -> Void)
-> AnyCancellable
{
var cancellable: AnyCancellable?
// Prevents a retain cycle when cancellable retains itself
var unmanagedCancellable: Unmanaged<AnyCancellable>?
cancellable = self
.handleEvents(
receiveCancel: { [weak cancelBag] in
// Postpone cleanup in case subscription finishes
// before cancellable is set.
if let unmanagedCancellable = unmanagedCancellable {
cancelBag?.remove(unmanagedCancellable.takeUnretainedValue())
unmanagedCancellable.release()
} else {
DispatchQueue.main.async {
if let unmanagedCancellable = unmanagedCancellable {
cancelBag?.remove(unmanagedCancellable.takeUnretainedValue())
unmanagedCancellable.release()
}
}
}
})
.sink(
receiveCompletion: { [weak cancelBag] completion in
receiveCompletion(completion)
// Postpone cleanup in case subscription finishes
// before cancellable is set.
if let unmanagedCancellable = unmanagedCancellable {
cancelBag?.remove(unmanagedCancellable.takeUnretainedValue())
unmanagedCancellable.release()
} else {
DispatchQueue.main.async {
if let unmanagedCancellable = unmanagedCancellable {
cancelBag?.remove(unmanagedCancellable.takeUnretainedValue())
unmanagedCancellable.release()
}
}
}
},
receiveValue: receiveValue)
unmanagedCancellable = Unmanaged.passRetained(cancellable!)
cancellable!.store(in: cancelBag)
return cancellable!
}
}
extension Publisher where Failure == Never {
/// Attaches a subscriber with closure-based behavior.
///
/// This method creates the subscriber and immediately requests an unlimited
/// number of values.
///
/// The returned cancellable is added to cancelBag, and removed when
/// publisher completes.
///
/// - parameter cancelBag: A CancelBag instance.
/// - parameter receiveValue: The closure to execute on receipt of a value.
/// - returns: An AnyCancellable instance.
@discardableResult
func sink(
in cancelBag: CancelBag,
receiveValue: @escaping (Output) -> Void)
-> AnyCancellable
{
sink(in: cancelBag, receiveCompletion: { _ in }, receiveValue: receiveValue)
}
}
import Combine
import XCTest
final class CancelBagTests: XCTestCase {
func testCancelBagExplicitCancel() {
let cancelBag = CancelBag()
let subject = PassthroughSubject<Void, Never>()
var isCancelled = false
subject
.handleEvents(receiveCancel: { isCancelled = true })
.sink(receiveValue: { _ in })
.store(in: cancelBag)
XCTAssertFalse(isCancelled)
cancelBag.cancel()
XCTAssertTrue(isCancelled)
}
func testCancelBagExplicitCancelRetainingCancellable() {
let cancelBag = CancelBag()
let subject = PassthroughSubject<Void, Never>()
var isCancelled = false
let cancellable = subject
.handleEvents(receiveCancel: { isCancelled = true })
.sink(receiveValue: { _ in })
cancellable.store(in: cancelBag)
withExtendedLifetime(cancellable) {
XCTAssertFalse(isCancelled)
cancelBag.cancel()
XCTAssertTrue(isCancelled)
}
}
func testCancelBagImplicitCancelWhenDeinitialized() {
var cancelBag: CancelBag? = CancelBag()
let subject = PassthroughSubject<Void, Never>()
var isCancelled = false
subject
.handleEvents(receiveCancel: { isCancelled = true })
.sink(receiveValue: { _ in })
.store(in: cancelBag!)
XCTAssertFalse(isCancelled)
cancelBag = nil
XCTAssertTrue(isCancelled)
}
func testCancelBagImplicitCancelWhenDeinitializedRetainingCancellable() {
var cancelBag: CancelBag? = CancelBag()
let subject = PassthroughSubject<Void, Never>()
var isCancelled = false
let cancellable = subject
.handleEvents(receiveCancel: { isCancelled = true })
.sink(receiveValue: { _ in })
cancellable.store(in: cancelBag!)
withExtendedLifetime(cancellable) {
XCTAssertFalse(isCancelled)
cancelBag = nil
XCTAssertTrue(isCancelled)
}
}
func testCancelBagAcceptsExternalCancellation() {
let cancelBag = CancelBag()
let subject = PassthroughSubject<Void, Never>()
var isCancelled = false
let cancellable = subject
.handleEvents(receiveCancel: { isCancelled = true })
.sink(receiveValue: { _ in })
cancellable.store(in: cancelBag)
XCTAssertFalse(isCancelled)
cancellable.cancel()
XCTAssertTrue(isCancelled)
}
func testCancelBagAcceptsExternalCancellationRetainingCancellable() {
let cancelBag = CancelBag()
let subject = PassthroughSubject<Void, Never>()
var isCancelled = false
let cancellable = subject
.handleEvents(receiveCancel: { isCancelled = true })
.sink(receiveValue: { _ in })
cancellable.store(in: cancelBag)
withExtendedLifetime(cancellable) {
XCTAssertFalse(isCancelled)
cancellable.cancel()
XCTAssertTrue(isCancelled)
}
}
// MARK: - Sink
func testCancelBagSinkJust() {
let cancelBag = CancelBag()
let publisher = Just(0)
var isCancelled = false
publisher
.handleEvents(receiveCancel: { isCancelled = true })
.sink(in: cancelBag, receiveValue: { _ in })
XCTAssertFalse(isCancelled)
cancelBag.cancel()
XCTAssertFalse(isCancelled) // too late
}
func testCancelBagSinkEmpty() {
let cancelBag = CancelBag()
let publisher = Empty(outputType: Void.self, failureType: Never.self)
var isCancelled = false
publisher
.handleEvents(receiveCancel: { isCancelled = true })
.sink(in: cancelBag, receiveValue: { _ in })
XCTAssertFalse(isCancelled)
cancelBag.cancel()
XCTAssertFalse(isCancelled) // too late
}
func testCancelBagSinkFail() {
struct TestError: Error { }
let cancelBag = CancelBag()
let publisher = Fail(outputType: Void.self, failure: TestError())
var isCancelled = false
publisher
.handleEvents(receiveCancel: { isCancelled = true })
.sink(in: cancelBag, receiveCompletion: { _ in }, receiveValue: { _ in })
XCTAssertFalse(isCancelled)
cancelBag.cancel()
XCTAssertFalse(isCancelled) // too late
}
func testCancelBagSinkExplicitCancel() {
let cancelBag = CancelBag()
let subject = PassthroughSubject<Void, Never>()
var isCancelled = false
subject
.handleEvents(receiveCancel: { isCancelled = true })
.sink(in: cancelBag, receiveValue: { _ in })
XCTAssertFalse(isCancelled)
cancelBag.cancel()
XCTAssertTrue(isCancelled)
}
func testCancelBagSinkExplicitCancelRetainingCancellable() {
let cancelBag = CancelBag()
let subject = PassthroughSubject<Void, Never>()
var isCancelled = false
let cancellable = subject
.handleEvents(receiveCancel: { isCancelled = true })
.sink(in: cancelBag, receiveValue: { _ in })
withExtendedLifetime(cancellable) {
XCTAssertFalse(isCancelled)
cancelBag.cancel()
XCTAssertTrue(isCancelled)
}
}
func testCancelBagSinkImplicitCancelWhenDeinitialized() {
var cancelBag: CancelBag? = CancelBag()
let subject = PassthroughSubject<Void, Never>()
var isCancelled = false
subject
.handleEvents(receiveCancel: { isCancelled = true })
.sink(in: cancelBag!, receiveValue: { _ in })
XCTAssertFalse(isCancelled)
cancelBag = nil
XCTAssertTrue(isCancelled)
}
func testCancelBagSinkImplicitCancelWhenDeinitializedRetainingCancellable() {
var cancelBag: CancelBag? = CancelBag()
let subject = PassthroughSubject<Void, Never>()
var isCancelled = false
let cancellable = subject
.handleEvents(receiveCancel: { isCancelled = true })
.sink(in: cancelBag!, receiveValue: { _ in })
withExtendedLifetime(cancellable) {
XCTAssertFalse(isCancelled)
cancelBag = nil
XCTAssertTrue(isCancelled)
}
}
func testCancelBagSinkReleasesMemoryOnCancellation() {
let cancelBag = CancelBag()
let subject = PassthroughSubject<Void, Never>()
subject.sink(in: cancelBag, receiveValue: { _ in })
XCTAssertFalse(cancelBag.isEmpty)
cancelBag.cancel()
XCTAssertTrue(cancelBag.isEmpty)
}
func testCancelBagSinkReleasesMemoryOnCancellationRetainingCancellable() {
let cancelBag = CancelBag()
let subject = PassthroughSubject<Void, Never>()
let cancellable = subject.sink(in: cancelBag, receiveValue: { _ in })
withExtendedLifetime(cancellable) {
XCTAssertFalse(cancelBag.isEmpty)
cancelBag.cancel()
XCTAssertTrue(cancelBag.isEmpty)
}
}
func testCancelBagSinkReleasesMemoryOnCompletionFinished() {
let cancelBag = CancelBag()
let subject = PassthroughSubject<Void, Never>()
subject.sink(in: cancelBag, receiveValue: { _ in })
XCTAssertFalse(cancelBag.isEmpty)
subject.send(completion: .finished)
XCTAssertTrue(cancelBag.isEmpty)
}
func testCancelBagSinkEventuallyReleasesMemoryOnCompletionFinishedImmediate() {
let cancelBag = CancelBag()
let publisher = Empty<Void, Never>()
publisher.sink(in: cancelBag, receiveValue: { _ in })
XCTAssertFalse(cancelBag.isEmpty)
let expectation = self.expectation(description: "Empty cancelBag")
DispatchQueue.main.async {
XCTAssertTrue(cancelBag.isEmpty)
expectation.fulfill()
}
waitForExpectations(timeout: 1, handler: nil)
}
func testCancelBagSinkReleasesMemoryOnCompletionFinishedRetainingCancellable() {
let cancelBag = CancelBag()
let subject = PassthroughSubject<Void, Never>()
let cancellable = subject.sink(in: cancelBag, receiveValue: { _ in })
withExtendedLifetime(cancellable) {
XCTAssertFalse(cancelBag.isEmpty)
subject.send(completion: .finished)
XCTAssertTrue(cancelBag.isEmpty)
}
}
func testCancelBagSinkReleasesMemoryOnCompletionFailure() {
struct TestError: Error { }
let cancelBag = CancelBag()
let subject = PassthroughSubject<Void, TestError>()
subject.sink(in: cancelBag, receiveCompletion: { _ in }, receiveValue: { _ in })
XCTAssertFalse(cancelBag.isEmpty)
subject.send(completion: .failure(TestError()))
XCTAssertTrue(cancelBag.isEmpty)
}
func testCancelBagSinkEventuallyReleasesMemoryOnCompletionFailureImmediate() {
struct TestError: Error { }
let cancelBag = CancelBag()
let publisher = Fail<Void, TestError>(error: TestError())
publisher.sink(in: cancelBag, receiveCompletion: { _ in }, receiveValue: { _ in })
XCTAssertFalse(cancelBag.isEmpty)
let expectation = self.expectation(description: "Empty cancelBag")
DispatchQueue.main.async {
XCTAssertTrue(cancelBag.isEmpty)
expectation.fulfill()
}
waitForExpectations(timeout: 1, handler: nil)
}
func testCancelBagSinkReleasesMemoryOnCompletionFailureRetainingCancellable() {
struct TestError: Error { }
let cancelBag = CancelBag()
let subject = PassthroughSubject<Void, TestError>()
let cancellable = subject.sink(in: cancelBag, receiveCompletion: { _ in }, receiveValue: { _ in })
withExtendedLifetime(cancellable) {
XCTAssertFalse(cancelBag.isEmpty)
subject.send(completion: .failure(TestError()))
XCTAssertTrue(cancelBag.isEmpty)
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment