Skip to content

Instantly share code, notes, and snippets.

@rnapier rnapier/stream.swift
Last active Jun 22, 2019

Embed
What would you like to do?
Newer stream-based ideas on observables
/*
Updated ideas on observation. Much more powerful and composeable than previous Observable approach.
Simpler, but less powerful, than RxSwift
*/
import Foundation
public class Disposable {
private var isDisposed = false
private let _dispose: () -> Void
public func dispose() {
if !isDisposed {
_dispose()
isDisposed = true
}
}
public init(dispose: @escaping () -> Void) { self._dispose = dispose }
deinit {
dispose()
}
}
public class DisposeBag: Disposable {
private var disposables: [Disposable]
public init(_ disposables: [Disposable] = []) {
self.disposables = disposables
super.init(dispose: {})
}
public override func dispose() {
disposables.removeAll()
}
func insert(_ disposable: Disposable) {
disposables.append(disposable)
}
}
extension Disposable {
public func disposed(by bag: DisposeBag) {
bag.insert(self)
}
}
public enum TimeoutResult<T> {
case success(T)
case timeout
}
/*
An observable stream of values.
Note that a base Stream has no way to generate values, so unless you override addObserver, it's not actually useful.
*/
public class ValueStream<T> {
fileprivate init() {}
public func addObserver(didChange: @escaping (T) -> Void) -> Disposable {
preconditionFailure("This stream cannot produce values.")
}
public func addOneShot(didChange: @escaping (T) -> Void) {
var remover: Disposable?
remover = addObserver { value in
didChange(value)
remover?.dispose()
remover = nil
}
}
public func addOneShot(timeout: TimeInterval, didChange: @escaping (TimeoutResult<T>) -> Void) {
var remover: Disposable?
remover = addObserver { value in
guard let localRemover = remover else { return }
remover = nil
didChange(.success(value))
localRemover.dispose()
}
DispatchQueue.main.asyncAfter(deadline: .now() + timeout) {
guard let localRemover = remover else { return }
remover = nil
didChange(.timeout)
localRemover.dispose()
}
}
public func map<U>(_ transform: @escaping (T) -> U) -> ValueStream<U> {
return ComposedStream(base: self, composer: { didChange in
return { value in didChange(transform(value)) }
})
}
public func compactMap<U>(_ transform: @escaping (T) -> U?) -> ValueStream<U> {
return ComposedStream(base: self, composer: { didChange in
return { value in
if let value = transform(value) {
didChange(value)
}
}
})
}
public func filter(_ predicate: @escaping (T) -> Bool) -> ValueStream<T> {
return ComposedStream(base: self, composer: { didChange in
return { value in
if predicate(value) { didChange(value) }
}
})
}
public func dropFirst(_ count: Int = 1) -> ValueStream<T> {
return ComposedStream(base: self, composer: { didChange in
var remaining = count
return { value in
if remaining <= 0 {
didChange(value)
} else {
remaining -= 1
}
}
})
}
public func first(_ count: Int = 1) -> ValueStream<T> {
return ComposedStream(base: self, composer: { didChange in
var remaining = count
return { value in
if remaining > 0 {
didChange(value)
remaining -= 1
}
}
})
}
public func on(queue: DispatchQueue) -> ValueStream<T> {
return ComposedStream(base: self, composer: { didChange in
return { value in
queue.async { didChange(value) }
}
})
}
public func onMainQueue() -> ValueStream<T> {
return on(queue: .main)
}
public func withPrevious() -> ValueStream<(newValue: T, oldValue: T?)> {
return ComposedStream(base: self, composer: { didChange in
var oldValue: T?
return { value in
didChange((newValue: value, oldValue: oldValue))
oldValue = value
}
})
}
public func asEventStream() -> ValueStream<Void> {
return map { _ in return () }
}
public func trigger(onRisingEdge predicate: @escaping (T) -> Bool) -> EventStream {
return withPrevious().filter { (current, previous) in
guard let previous = previous else { return false } // Can't have edge without previous
return !predicate(previous) && predicate(current)
}.asEventStream()
}
// Throttle to once per event loop
public func throttleTillBlockEnd() -> ValueStream<T> {
return ComposedStream(base: self, composer: { didChange in
var active = true
return { value in
guard active else { return }
didChange(value)
active = false
DispatchQueue.main.async { active = true }
}
})
}
}
extension ValueStream where T: Equatable {
public func distinct() -> ValueStream<T> {
return ComposedStream(base: self, composer: { didChange in
var previous: [T] = []
return { value in
if !previous.contains(value) {
didChange(value)
}
previous.append(value)
}
})
}
public func distinctUntilChanged() -> ValueStream<T> {
return ComposedStream(base: self, composer: { didChange in
var previous: T?
return { value in
if previous != value {
didChange(value)
}
previous = value
}
})
}
}
extension ValueStream {
public func filterNil<U>() -> ValueStream<U> where T == U? {
return compactMap { $0 }
}
}
/*
A subject can emit arbitrary values to its observers.
(Breaking it out like this prevents calling `emit` on the result of map() which doesn't do anything.)
*/
public class Subject<T>: ValueStream<T> {
public override init() { super.init() }
fileprivate var observers: [UUID: (T) -> Void] = [:]
public override func addObserver(didChange: @escaping (T) -> Void) -> Disposable {
let identifier = UUID()
observers[identifier] = didChange
return Disposable { [weak self] in
if let strongSelf = self {
strongSelf.observers.removeValue(forKey: identifier)
}
}
}
}
public class PublishSubject<T>: Subject<T> {
public func emit(_ value: T) {
for observer in observers.values {
observer(value)
}
}
}
private class ComposedStream<Input, Output>: ValueStream<Output> {
let base: ValueStream<Input>
let composer: (@escaping (Output) -> Void) -> (Input) -> Void
init(base: ValueStream<Input>, composer: @escaping (@escaping (Output) -> Void) -> (Input) -> Void) {
self.base = base
self.composer = composer
super.init()
}
override func addObserver(didChange: @escaping (Output) -> Void) -> Disposable {
return base.addObserver(didChange: composer(didChange))
}
}
/*
A Variable is a Stream that makes its value available directly. On subscription it immediately
emits its most recent value. (Rx: BehaviorSubject)
*/
public class Variable<T>: Subject<T> {
public var value: T {
didSet {
for observer in observers.values {
observer(value)
}
}
}
public init(_ value: T) { self.value = value }
public override func addObserver(didChange: @escaping (T) -> Void) -> Disposable {
didChange(value)
return super.addObserver(didChange: didChange)
}
}
extension Variable: CustomStringConvertible where T: CustomStringConvertible {
public var description: String { return value.description }
}
public class UserDefaultsVariable<T>: Subject<T> {
let key: String
let defaultValue: T
let userDefaults: UserDefaults
public var value: T {
get {
return userDefaults.object(forKey: key) as? T ?? defaultValue
}
set {
userDefaults.set(newValue, forKey: key)
for observer in observers.values {
observer(value)
}
}
}
public init(key: String, defaultValue: T, userDefaults: UserDefaults = .standard) {
self.key = key
self.defaultValue = defaultValue
self.userDefaults = userDefaults
}
public override func addObserver(didChange: @escaping (T) -> Void) -> Disposable {
didChange(value)
return super.addObserver(didChange: didChange)
}
}
/*
Stream of NSNotifications
*/
public class NotificationStream: ValueStream<Notification> {
let name: NSNotification.Name?
let object: Any?
let notificationCenter: NotificationCenter
public init(forName name: Notification.Name?, object: Any?, notificationCenter: NotificationCenter = .default) {
self.name = name
self.object = object
self.notificationCenter = notificationCenter
super.init()
}
public override func addObserver(didChange: @escaping (Notification) -> Void) -> Disposable {
let center = notificationCenter
let observer = center.addObserver(forName: name, object: object, queue: nil) { note in
didChange(note)
}
return Disposable {
center.removeObserver(observer)
}
}
}
/*
Stream of KVO changes. Retains observed object.
*/
public class KVOStream<Observed: NSObject, Value>: ValueStream<(Observed, NSKeyValueObservedChange<Value>)> {
let object: Observed
let keyPath: KeyPath<Observed, Value>
let options: NSKeyValueObservingOptions
public init(object: Observed, keyPath: KeyPath<Observed, Value>, options: NSKeyValueObservingOptions = []) {
self.object = object
self.keyPath = keyPath
self.options = options.union(.new) // Always require the new value for asValues()
}
public override func addObserver(didChange: @escaping ((Observed, NSKeyValueObservedChange<Value>)) -> Void) -> Disposable {
let observation = object.observe(keyPath, options: options, changeHandler: didChange)
return Disposable { observation.invalidate() }
}
public func asValues() -> ValueStream<Value> {
return compactMap { _, change in return change.newValue }
}
}
/*
An EventStream is a convenient way to manage Stream<Void>. It is generally used when the observer
doesn't care about the value. This avoids the need for "_ in", and makes it easier to combine streams
of different types.
*/
public typealias EventStream = ValueStream<Void>
// Type-erasing protocol in order to ignore "T"
public protocol EventStreamConvertible {
func asEventStream() -> EventStream
}
extension ValueStream: EventStreamConvertible {}
public class CompositeStream<T>: ValueStream<T> {
private let streams: [ValueStream<T>]
public init<Streams>(_ streams: Streams)
where Streams: Collection, Streams.Element == ValueStream<T> {
self.streams = Array(streams)
}
public override func addObserver(didChange: @escaping (T) -> Void) -> Disposable {
return DisposeBag(streams.map { $0.addObserver(didChange: didChange) })
}
}
public func makeTrigger(forAnyOf streams: [EventStreamConvertible]) -> EventStream {
return CompositeStream(streams.map { $0.asEventStream() })
}
class CombineLatest2Stream<T, U>: ValueStream<(T, U)> {
private let streams: (ValueStream<T>, ValueStream<U>)
init(_ streams: (ValueStream<T>, ValueStream<U>)) {
self.streams = streams
}
override func addObserver(didChange: @escaping ((T, U)) -> Void) -> Disposable {
let disposeBag = DisposeBag()
var values: (T?, U?) = (nil, nil)
streams.0.addObserver { (t) in
values.0 = t
if let u = values.1 { didChange((t, u)) }
}.disposed(by: disposeBag)
streams.1.addObserver { (u) in
values.1 = u
if let t = values.0 { didChange((t, u)) }
}.disposed(by: disposeBag)
return disposeBag
}
}
// swiftlint:disable large_tuple
class CombineLatest5Stream<T, U, V, W, X>: ValueStream<(T, U, V, W, X)> {
private let streams: (ValueStream<T>, ValueStream<U>, ValueStream<V>, ValueStream<W>, ValueStream<X>)
init(_ streams: (ValueStream<T>, ValueStream<U>, ValueStream<V>, ValueStream<W>, ValueStream<X>)) {
self.streams = streams
}
override func addObserver(didChange: @escaping ((T, U, V, W, X)) -> Void) -> Disposable {
let disposeBag = DisposeBag()
var values: (T?, U?, V?, W?, X?) = (nil, nil, nil, nil, nil)
streams.0.addObserver { (t) in
values.0 = t
if let u = values.1,
let v = values.2,
let w = values.3,
let x = values.4 { didChange((t, u, v, w, x)) }
}.disposed(by: disposeBag)
streams.1.addObserver { (u) in
values.1 = u
if let t = values.0,
let v = values.2,
let w = values.3,
let x = values.4 { didChange((t, u, v, w, x)) }
}.disposed(by: disposeBag)
streams.2.addObserver { (v) in
values.2 = v
if let t = values.0,
let u = values.1,
let w = values.3,
let x = values.4 { didChange((t, u, v, w, x)) }
}.disposed(by: disposeBag)
streams.3.addObserver { (w) in
values.3 = w
if let t = values.0,
let u = values.1,
let v = values.2,
let x = values.4 { didChange((t, u, v, w, x)) }
}.disposed(by: disposeBag)
streams.4.addObserver { (x) in
values.4 = x
if let t = values.0,
let u = values.1,
let v = values.2,
let w = values.3 { didChange((t, u, v, w, x)) }
}.disposed(by: disposeBag)
return disposeBag
}
}
public func combineLatest<T, U>(_ stream1: ValueStream<T>, _ stream2: ValueStream<U>) -> ValueStream<(T, U)> {
return CombineLatest2Stream((stream1, stream2))
}
public func combineLatest<T, U, V, W, X>(_ stream1: ValueStream<T>,
_ stream2: ValueStream<U>,
_ stream3: ValueStream<V>,
_ stream4: ValueStream<W>,
_ stream5: ValueStream<X>) -> ValueStream<(T, U, V, W, X)> {
return CombineLatest5Stream((stream1, stream2, stream3, stream4, stream5))
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.