Skip to content

Instantly share code, notes, and snippets.

@MisterRager
Created March 14, 2018 00:04
Show Gist options
  • Save MisterRager/f5ae2d4cbe51229146f9b3af6a19b63f to your computer and use it in GitHub Desktop.
Save MisterRager/f5ae2d4cbe51229146f9b3af6a19b63f to your computer and use it in GitHub Desktop.
A simple utility for staging updates on a stream of records without holding merged record state
private typealias Update<T> = T.() -> Unit
class DataUpdatesStaging<T> {
private val updateBuffer: MutableList<Update<T>> = ArrayList()
private val updateStream: BehaviorSubject<List<Update<T>>> = BehaviorSubject.create(updateBuffer)
fun stageUpdate(update: Update<T>) {
synchronized(updateBuffer) {
updateBuffer.add(update)
updateStream.onNext(updateBuffer)
}
}
fun applyUpdates(recordStream: Observable<T>): Observable<T> = Observable
.combineLatest(recordStream, updateStream, { rec, updates ->
updates.forEach { it(rec) }
rec
})
fun discardUpdates() {
synchronized(updateBuffer) {
updateBuffer.clear()
}
}
fun finalizeUpdates(record: T?): T? {
synchronized(updateBuffer) {
if (record != null) {
updateBuffer.forEach { it(record) }
updateBuffer.clear()
}
}
return record
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment