Skip to content

Instantly share code, notes, and snippets.

@RuiAAPeres
Last active August 30, 2021 17:24
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save RuiAAPeres/ee11a4a50970f68c68a0e9bc78290d75 to your computer and use it in GitHub Desktop.
Save RuiAAPeres/ee11a4a50970f68c68a0e9bc78290d75 to your computer and use it in GitHub Desktop.
ReactiveSwift extensions for GRDB
/// This heavily based on the work done at RxGRDB
import GRDB
import ReactiveSwift
extension DatabasePool: ReactiveExtensionsProvider {}
extension DatabaseQueue: ReactiveExtensionsProvider {}
extension ValueObservation: ReactiveExtensionsProvider {}
extension DatabaseRegionObservation: ReactiveExtensionsProvider {}
extension Reactive where Base: DatabaseReader {
public func read<T>(
value: @escaping (Database) throws -> T
) -> SignalProducer<T, Error> {
return SignalProducer { observer, _ in
self.base.asyncRead { database in
do {
switch database {
case let .success(db):
try observer.send(value: value(db))
observer.sendCompleted()
case let .failure(error):
observer.send(error: error)
}
} catch {
observer.send(error: error)
}
}
}
}
}
extension Reactive where Base: DatabaseWriter {
public func write<T>(
updates: @escaping (Database) throws -> T
) -> SignalProducer<T, Error> {
SignalProducer { observer, _ in
self.base.asyncWrite(updates, completion: { _, result in
switch result {
case let .success(value):
observer.send(value: value)
observer.sendCompleted()
case let .failure(error):
observer.send(error: error)
}
})
}
}
public func write<T, U>(
updates: @escaping (Database) throws -> T,
thenRead value: @escaping (Database, T) throws -> U
) -> SignalProducer<U, Error> {
SignalProducer { observer, _ in
self.base.asyncWriteWithoutTransaction { db in
var updatesValue: T?
do {
try db.inTransaction {
updatesValue = try updates(db)
return .commit
}
} catch {
observer.send(error: error)
return
}
self.base.spawnConcurrentRead { dbResult in
do {
switch dbResult {
case let .success(db):
let aValue: U = try value(db, updatesValue!)
observer.send(value: aValue)
observer.sendCompleted()
case let .failure(error):
observer.send(error: error)
}
} catch {
observer.send(error: error)
}
}
}
}
}
}
extension Reactive where Base == DatabaseRegionObservation {
public func changes(in writer: DatabaseWriter) -> SignalProducer<Database, Error> {
SignalProducer { (observer: Signal<Database, Error>.Observer, lifetime: Lifetime) in
do {
let transationObserver = try self.base.start(in: writer, onChange: observer.send(value:))
lifetime.observeEnded {
writer.remove(transactionObserver: transationObserver)
}
} catch {
observer.send(error: error)
}
}
}
}
public protocol _ValueObservationProtocol {
associatedtype Reducer: _ValueReducer
func start(
in reader: DatabaseReader,
scheduling scheduler: ValueObservationScheduler,
onError: @escaping (Error) -> Void,
onChange: @escaping (Reducer.Value) -> Void) -> DatabaseCancellable
}
extension ValueObservation: _ValueObservationProtocol { }
extension Reactive where Base: _ValueObservationProtocol {
public func observe(in reader: DatabaseReader, scheduler: ValueObservationScheduler) -> SignalProducer<Base.Reducer.Value, Error> {
SignalProducer { (observer: Signal<Base.Reducer.Value, Error>.Observer, lifetime: Lifetime) in
let cancellable = self.base.start(in: reader, scheduling: scheduler, onError: observer.send(error:), onChange: observer.send(value:))
lifetime.observeEnded(cancellable.cancel)
}
}
}
@sergdort
Copy link

this is gonna crash in case of error, right?

@RuiAAPeres
Copy link
Author

this is gonna crash in case of error, right?

yeah, added the earlier return.

@sergdort
Copy link

Should it sendCompleted() here

@RuiAAPeres
Copy link
Author

Should it sendCompleted() here

🚀 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment