Skip to content

Instantly share code, notes, and snippets.

@hikarin522
Last active February 26, 2018 04:04
Show Gist options
  • Save hikarin522/91c3c4f7157f4d5d45e7eae7d8fe3e05 to your computer and use it in GitHub Desktop.
Save hikarin522/91c3c4f7157f4d5d45e7eae7d8fe3e05 to your computer and use it in GitHub Desktop.
RxSwift: ObservableをSequenceで同期処理に変更する ref: https://qiita.com/hikarin522/items/187d48b25d9367170a51
class SyncFIFO<T> {
var buf = Array<T>()
let lock = DispatchSemaphore(value: 1)
let sem = DispatchSemaphore(value: 0)
func push(_ val: T) {
self.lock.wait()
defer { self.lock.signal() }
self.buf.append(val)
self.sem.signal()
}
func popSync() -> T {
sem.wait()
lock.wait()
defer { lock.signal() }
return buf.removeFirst()
}
}
extension ObservableType {
func toSequenceSync<T>() -> AnySequence<Event<T>> where T == E {
return AnySequence { () -> AnyIterator<Event<T>> in
let buf = SyncFIFO<Event<T>?>()
let disposable = self.subscribe(buf.push)
return AnyIterator {
guard let ev = buf.popSync() else {
disposable.dispose()
return nil
}
switch ev {
case .error, .completed:
buf.push(nil)
fallthrough
case .next:
return ev
}
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment