Last active
February 26, 2018 04:04
-
-
Save hikarin522/91c3c4f7157f4d5d45e7eae7d8fe3e05 to your computer and use it in GitHub Desktop.
RxSwift: ObservableをSequenceで同期処理に変更する ref: https://qiita.com/hikarin522/items/187d48b25d9367170a51
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SyncFIFO<T> { | |
var buf = Array<T>() | |
let lock = DispatchSemaphore(value: 1) | |
let sem = DispatchSemaphore(value: 0) | |
func push(_ val: T) { | |
self.lock.wait() | |
defer { self.lock.signal() } | |
self.buf.append(val) | |
self.sem.signal() | |
} | |
func popSync() -> T { | |
sem.wait() | |
lock.wait() | |
defer { lock.signal() } | |
return buf.removeFirst() | |
} | |
} | |
extension ObservableType { | |
func toSequenceSync<T>() -> AnySequence<Event<T>> where T == E { | |
return AnySequence { () -> AnyIterator<Event<T>> in | |
let buf = SyncFIFO<Event<T>?>() | |
let disposable = self.subscribe(buf.push) | |
return AnyIterator { | |
guard let ev = buf.popSync() else { | |
disposable.dispose() | |
return nil | |
} | |
switch ev { | |
case .error, .completed: | |
buf.push(nil) | |
fallthrough | |
case .next: | |
return ev | |
} | |
} | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment