Skip to content

Instantly share code, notes, and snippets.

@ashevin
Last active February 2, 2020 13:19
Show Gist options
  • Save ashevin/f65fb999367439a51374573b85ba69dd to your computer and use it in GitHub Desktop.
Save ashevin/f65fb999367439a51374573b85ba69dd to your computer and use it in GitHub Desktop.
Observable - A case study in memory management

Observable/Observer

A case study

Observable is an implementation of the reactive programming style. This document explores the considerations which went into the implementation.

Memory Management

Instances of Observer are intended to be chained in order to appropriately filter and transform emitted values. It is critical that special care is exercised to avoid unintended retain cycles.

In addition, there is a question of ownership. Do instances of Observer own their observers, or do observers own their Observers?

To answer these questions, it is important to distinguish between the two roles an instance of Observer may play.

  1. Emitter. That is, the Observer may be the root source of events. (An instance of the Observable subclass; see below.)
  2. Link. The instance may be observing events from a previous link, for the purpose of manipulating the value in some way.

The model

Observer expects emitters to be owned by the code which is generating the events to be emitted. Intermediate links of an observer chain are owned by the next link in the chain. The final link in a chain is owned by the code which established the chain.

Let us examine why this model was selected.

Ownership of emitters

An emitter must exist for as long as events may be generated. It will exist before any observers have been chained, and it should persist even if all observers are removed. Combining all 3 requirements leads to the model explained above.

Note: In addition, Observers which are chained off an emitter will assert ownership (maintain a strong reference) for as long as they exist. This is not due to a requirement specific to emitters, but is a natural consequence of links owning their parent.

Ownership of intermediate links

Without a model in which links owned adjacent links, it would be necessary for the code establishing a chain to capture a reference to every link. Otherwise, the chain would evaporate immediately, due to ARC.

Having the parent own its child link(s) is not desirable. The emitter is the root of the chain, and it is intended for emitters to outlast any observers. If parents owned children, the entire chain would persist, even if the user of the final link was no longer interested, or even in existence. While not a memory leak, per se, it is an undesirable state.

The purpose of a chain of Observers is to transform the original emitted value into something usable by the code which established the chain. The intermediate links serve no independent purpose, and it is expected that their lifetime be no greater than the final link.

By establishing a model in which a link owns its parent, the property described above is achieved. When the final link is released, it will release its parent. This will recursively release each parent in turn.

Ownership of the final link

It is necessary for the code which established the chain to own the final link. Without this anchor, the entire chain (minus the emitter) will be immediately released.

Releasing a link

While we usually talk of chains of Observers, actual usage is more complicated. Each Observer may be observed by one or more observers. In addition, several operators create additional observers internally. This causes the "chain" to appear more like a tree, when viewed from the emitter.

When a link is released, it must also release its parent. This triggers the recursive release of the intermediate links, as mentioned above. However, if a link has siblings, it is imperative that its parent not unlink itself. Otherwise, the chains for the sibling observers would be broken. The logic outlined here is implemented within the Observer class, and must be triggered explicitly.

Link Bags

Due to the explicit nature of releasing a link, it quickly becomes unwieldy to handle multiple observers (final links) and ensure they are properly unlinked. Enter the LinkBag. The purpose of a LinkBag is to own all final links within a particular context. When the LinkBag instance is released, it performs the unlink operation for each link it contains. The given context causes the LinkBag to be released, and does not have to concern itself with the Observers. This can be as simple as having an instance variable which is automatically released when the instance is released.

The source

public protocol Unlinkable {
    func unlink()
}

private protocol UnlinkableObserver: Unlinkable {
    var observerCount: Int { get }
    var parent: UnlinkableObserver? { get }
    func add(to linkBag: LinkBag)
}

First, we have the public protocol, Unlinkable which forms the basis for items which may be added to a link bag.

Next, we have a private protocol which describes the behavior required to implement the linking and unlinking logic. Even though this protocol is only implemented and used internally by Observer, declaring the protocol documents a clear abstraction of the linking behavior.


public final class LinkBag {
    private(set) internal var links = [Unlinkable]()

    public func add(_ unlinkable: Unlinkable) {
        links.append(unlinkable)
    }

    public init() { }

    public func clear() {
        links.forEach { $0.unlink() }
        links.removeAll()
    }

    deinit {
        links.forEach { $0.unlink() }
    }
}

LinkBag is a straightforward class. On deinit, it unlinks its contents.


private struct _Observer<Value> {
    private var nextHandler: ((Value) -> Void)?
    private var errorHandler: ((Error) -> Void)?
    private var finishHandler: (() -> Void)?
    private var queue: DispatchQueue?

    func next(_ value: Value) {
        enqueue { self.nextHandler?(value) }
    }

    func error(_ error: Error) {
        enqueue { self.errorHandler?(error) }
    }

    func finish() {
        enqueue { self.finishHandler?() }
    }

    private func enqueue(_ block: @escaping () -> Void) {
        if let queue = queue {
            queue.async(execute: block)
        }
        else {
            block()
        }
    }

    init(next: ((Value) -> Void)? = nil,
         error: ((Error) -> Void)? = nil,
         finish: (() -> Void)? = nil,
         queue: DispatchQueue?) {
        self.nextHandler = next
        self.errorHandler = error
        self.finishHandler = finish
        self.queue = queue
    }
}

This is a convenience struct, whose purpose is to encapsulate a closure which is observing events.

nextHandler: receives values from normal events.

errorHandler: an Observable may finish with an error. When an error event is emitted, this handler is invoked. A common example is a network request which completes unsuccessfully.

finishHandler: an Observable may indicate that it will no longer emit events by issuing a finish event.

queue: when an observer is registered, it may request that the handler be invoked on a given queue. This is commonly used to invoke UI handling on the main queue, or to ensure that work triggered by a UI event is offloaded to a background queue.


public class Observer<Value> {
    fileprivate enum State {
        case open
        case complete
        case error
    }

    fileprivate var observers = [_Observer<Value>]()
    internal var buffer = [Value]()
    fileprivate var state = State.open
    fileprivate var parent: UnlinkableObserver?

    var observerCount: Int {
        return observers.count
    }

    @discardableResult
    public func on(queue: DispatchQueue? = nil, next: @escaping (Value) -> Void) -> Observer<Value> {
        observers.append(_Observer(next: next, queue: queue))

        if buffer.count > 0 {
            buffer.forEach { value in
                observers.forEach { $0.next(value) }
            }
            buffer.removeAll()
        }

        return self
    }

    @discardableResult
    public func on(queue: DispatchQueue? = nil, error: @escaping (Error) -> Void) -> Observer<Value> {
        observers.append(_Observer(error: error, queue: queue))

        return self
    }

    @discardableResult
    public func on(queue: DispatchQueue? = nil, finish: @escaping () -> Void) -> Observer<Value> {
        observers.append(_Observer(finish: finish, queue: queue))

        return self
    }

    internal func next(_ value: Value) {
        guard state == .open else {
            return
        }

        if observers.count > 0 {
            observers.forEach { $0.next(value) }
        }
        else {
            buffer.append(value)
        }
    }

    internal func error(_ error: Error) {
        guard state == .open else {
            return
        }

        state = .error

        observers.forEach { $0.error(error) }
    }

    internal func finish() {
        guard state == .open else {
            return
        }

        state = .complete

        observers.forEach { $0.finish() }
    }
}

public class Observable<Value>: Observer<Value> {
    /**
     Emit a new value to observers.  If no observers are registered, the value is buffered.

     - parameter value: The value to emit.
     */
    public override func next(_ value: Value) {
        super.next(value)
    }

    public override func error(_ error: Error) {
        super.error(error)
    }

    public override func finish() {
        super.finish()
    }

    public init(_ value: Value? = nil) {
        super.init()

        if let value = value {
            next(value)
        }
    }
}

The base Observer class is used to form links. The first link will observe events emitted by an instance of Observable. The former only has a public API for observing events. The API for emitting events is internal, to allow the chaining mechanism to forward events. The latter class, Observable, reintroduces the emitter API as public. Instances of this class will usually be private to their owner, and the .observer() method will be used to provide an observer-only link.

extension Observer: UnlinkableObserver {
    public func unlink() {
        if parent?.observerCount ?? 0 < 2 {
            parent?.unlink()
        }

        parent = nil
    }

    public func add(to linkBag: LinkBag) {
        linkBag.add(self)
    }
}

This extension implements the UnlinkableObserver contract:

  • there exists a method to add the Observer to a link bag.
  • unlinking of the parent only takes place when the Observer has no siblings. i.e. the parent has no other children.
Siblings & Children

Examination of the code will show that an Observer is considered to have siblings if its parent has more than 1 observer. This, despite the fact that observers are of type _Observer, and not of type Observer. The logic is as follows: when an Observer is linked to another, the link is created by establishing an observer on the parent which will emit events it receives to the child link. Thus, if there is more than one observer, there may be a sibling. This mechanism is only a heuristic; if there is an observer registered which was not established by chaining an Observer, that observer will delay release of the parent.

The heuristic satisfies the requirement that a parent must not be released if it has children other than the link being removed.

Note that the delayed release is, itself, a desired property. If there is a direct observer to a link, children of that link should not trigger a release, as that would prevent the independent observer from receiving new events.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment