Skip to content

Instantly share code, notes, and snippets.

@Mathieu-Gosbee-ConnectedLab
Created May 10, 2018 18:26
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Mathieu-Gosbee-ConnectedLab/c03f28223280fdd84422447aee5e0a5a to your computer and use it in GitHub Desktop.
Save Mathieu-Gosbee-ConnectedLab/c03f28223280fdd84422447aee5e0a5a to your computer and use it in GitHub Desktop.
import Foundation
import RxSwift
public enum SerialQueueState {
case active
case idle
}
class SerialQueueSubject<T>: ObservableType where T: ObservableType {
public var state: Observable<SerialQueueState> {
return Observable.combineLatest(self.active, self.list)
.map { $0 == nil && $1.isEmpty ? .idle : .active }
.distinctUntilChanged()
}
func subscribe<O>(_ observer: O) -> Disposable where O: ObserverType, E == O.E {
let setNextDisposable = Observable
.combineLatest(active, list.asObservable())
.observeOn(MainScheduler.asyncInstance)
.subscribe(onNext: { [unowned active, unowned list] activeValue, listValue in
var listValue = listValue
if activeValue == nil && listValue.count > 0 {
let item = listValue.removeFirst()
active.onNext(item)
list.onNext(listValue)
}
})
let executeActiveDisposable = active
.filter { $0 != nil }
.map { $0! }
.flatMapLatest { $0 }
.observeOn(MainScheduler.asyncInstance)
.next { [unowned self] itemValue in
observer.onNext(itemValue)
self.active.onNext(nil)
}
return CompositeDisposable(setNextDisposable, executeActiveDisposable)
}
typealias E = T.E // swiftlint:disable:this type_name
private var list: BehaviorSubject<[T]> = BehaviorSubject(value: [])
private var active: ReplaySubject<T?> = ReplaySubject.create(bufferSize: 1)
public init(_ value: [T] = []) {
list.onNext(value)
active.onNext(nil)
}
func push(_ object: T) {
let currentList = try? list.value() + [object]
guard currentList != nil else {
return
}
self.list.onNext(currentList!)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment