Skip to content

Instantly share code, notes, and snippets.

@takasek
Last active March 19, 2018 04:39
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save takasek/f5cc2b26cbbd0e724cd9f449c073ea79 to your computer and use it in GitHub Desktop.
Save takasek/f5cc2b26cbbd0e724cd9f449c073ea79 to your computer and use it in GitHub Desktop.
`Single<T>` とそれを叩く `PublishSubject<Void>` を `inProgress, success(T), failure(E)` の川に変換 #CodePiece
enum AsyncState<T, E: Error> {
case inProgress
case success(T)
case failure(E)
}
extension Observable {
static func from<T, E: Error>(
trigger: PublishSubject<Void>,
execute asyncProcess: @escaping () -> Single<T>,
throws errorType: E.Type
) -> Observable<AsyncState<T, E>> {
return trigger.flatMapLatest { _ in
return asyncProcess()
.asObservable()
.map { .success($0) }
.catchError { .just(.failure($0 as! E)) }
.startWith(.inProgress)
}
}
}
final class Hoge {
var shouldSuccess = true
struct APIError: Swift.Error {}
func apiSingle() -> Single<Int> {
if shouldSuccess {
return Single<Int>.timer(0.3, scheduler: MainScheduler.instance)
} else {
return Single<Int>.timer(0.3, scheduler: MainScheduler.instance).flatMap { _ in Single.error(APIError()) }
}
}
let apiTrigger = PublishSubject<Void>()
lazy var apiState: Observable<AsyncState<Int, APIError>> = makeApiState()
private func makeApiState() -> Observable<AsyncState<Int, APIError>> {
return Observable<AsyncState<Int, APIError>>.from(
trigger: apiTrigger,
execute: { [unowned self] in self.apiSingle() },
throws: APIError.self
)
}
}
example("state from result") {
let hoge = Hoge()
hoge.apiState
.debug() // logs Observable<AsyncState<Int, APIError>>
.subscribe()
hoge.apiTrigger.onNext(())
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
hoge.apiTrigger.onNext(())
DispatchQueue.main.asyncAfter(deadline: .now() + 0.2) {
hoge.apiTrigger.onNext(()) // triggered before previous API finishes.
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
hoge.shouldSuccess = false
hoge.apiTrigger.onNext(()) // to throw an error.
DispatchQueue.main.asyncAfter(deadline: .now() + 1) {
hoge.shouldSuccess = true
hoge.apiTrigger.onNext(())
}
}
}
}
/*
--- state from result example ---
2018-03-19 13:11:45.027: Rx.playground:82 (__lldb_expr_18) -> subscribed
2018-03-19 13:11:45.032: Rx.playground:82 (__lldb_expr_18) -> Event next(inProgress)
2018-03-19 13:11:45.334: Rx.playground:82 (__lldb_expr_18) -> Event next(success(0))
2018-03-19 13:11:46.037: Rx.playground:82 (__lldb_expr_18) -> Event next(inProgress)
2018-03-19 13:11:46.259: Rx.playground:82 (__lldb_expr_18) -> Event next(inProgress) // flatMapped to latest single. Previous api is cancelled.
2018-03-19 13:11:46.561: Rx.playground:82 (__lldb_expr_18) -> Event next(success(0))
2018-03-19 13:11:47.263: Rx.playground:82 (__lldb_expr_18) -> Event next(inProgress)
2018-03-19 13:11:47.566: Rx.playground:82 (__lldb_expr_18) -> Event next(failure(__lldb_expr_18.Hoge.APIError()))
2018-03-19 13:11:48.268: Rx.playground:82 (__lldb_expr_18) -> Event next(inProgress) // the stream is alive after an error occurs.
2018-03-19 13:11:48.569: Rx.playground:82 (__lldb_expr_18) -> Event next(success(0))
*/
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment