Last active
August 30, 2021 17:24
-
-
Save RuiAAPeres/ee11a4a50970f68c68a0e9bc78290d75 to your computer and use it in GitHub Desktop.
ReactiveSwift extensions for GRDB
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/// This heavily based on the work done at RxGRDB | |
import GRDB | |
import ReactiveSwift | |
extension DatabasePool: ReactiveExtensionsProvider {} | |
extension DatabaseQueue: ReactiveExtensionsProvider {} | |
extension ValueObservation: ReactiveExtensionsProvider {} | |
extension DatabaseRegionObservation: ReactiveExtensionsProvider {} | |
extension Reactive where Base: DatabaseReader { | |
public func read<T>( | |
value: @escaping (Database) throws -> T | |
) -> SignalProducer<T, Error> { | |
return SignalProducer { observer, _ in | |
self.base.asyncRead { database in | |
do { | |
switch database { | |
case let .success(db): | |
try observer.send(value: value(db)) | |
observer.sendCompleted() | |
case let .failure(error): | |
observer.send(error: error) | |
} | |
} catch { | |
observer.send(error: error) | |
} | |
} | |
} | |
} | |
} | |
extension Reactive where Base: DatabaseWriter { | |
public func write<T>( | |
updates: @escaping (Database) throws -> T | |
) -> SignalProducer<T, Error> { | |
SignalProducer { observer, _ in | |
self.base.asyncWrite(updates, completion: { _, result in | |
switch result { | |
case let .success(value): | |
observer.send(value: value) | |
observer.sendCompleted() | |
case let .failure(error): | |
observer.send(error: error) | |
} | |
}) | |
} | |
} | |
public func write<T, U>( | |
updates: @escaping (Database) throws -> T, | |
thenRead value: @escaping (Database, T) throws -> U | |
) -> SignalProducer<U, Error> { | |
SignalProducer { observer, _ in | |
self.base.asyncWriteWithoutTransaction { db in | |
var updatesValue: T? | |
do { | |
try db.inTransaction { | |
updatesValue = try updates(db) | |
return .commit | |
} | |
} catch { | |
observer.send(error: error) | |
return | |
} | |
self.base.spawnConcurrentRead { dbResult in | |
do { | |
switch dbResult { | |
case let .success(db): | |
let aValue: U = try value(db, updatesValue!) | |
observer.send(value: aValue) | |
observer.sendCompleted() | |
case let .failure(error): | |
observer.send(error: error) | |
} | |
} catch { | |
observer.send(error: error) | |
} | |
} | |
} | |
} | |
} | |
} | |
extension Reactive where Base == DatabaseRegionObservation { | |
public func changes(in writer: DatabaseWriter) -> SignalProducer<Database, Error> { | |
SignalProducer { (observer: Signal<Database, Error>.Observer, lifetime: Lifetime) in | |
do { | |
let transationObserver = try self.base.start(in: writer, onChange: observer.send(value:)) | |
lifetime.observeEnded { | |
writer.remove(transactionObserver: transationObserver) | |
} | |
} catch { | |
observer.send(error: error) | |
} | |
} | |
} | |
} | |
public protocol _ValueObservationProtocol { | |
associatedtype Reducer: _ValueReducer | |
func start( | |
in reader: DatabaseReader, | |
scheduling scheduler: ValueObservationScheduler, | |
onError: @escaping (Error) -> Void, | |
onChange: @escaping (Reducer.Value) -> Void) -> DatabaseCancellable | |
} | |
extension ValueObservation: _ValueObservationProtocol { } | |
extension Reactive where Base: _ValueObservationProtocol { | |
public func observe(in reader: DatabaseReader, scheduler: ValueObservationScheduler) -> SignalProducer<Base.Reducer.Value, Error> { | |
SignalProducer { (observer: Signal<Base.Reducer.Value, Error>.Observer, lifetime: Lifetime) in | |
let cancellable = self.base.start(in: reader, scheduling: scheduler, onError: observer.send(error:), onChange: observer.send(value:)) | |
lifetime.observeEnded(cancellable.cancel) | |
} | |
} | |
} |
this is gonna crash in case of error, right?
yeah, added the earlier return.
Should it sendCompleted()
here
Should it sendCompleted() here
🚀 👍
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
this is gonna crash in case of error, right?