Skip to content

Instantly share code, notes, and snippets.

@joaocsousa
Last active September 12, 2019 07:00
Show Gist options
  • Save joaocsousa/d93091551338dcf925b4bac78ac5211d to your computer and use it in GitHub Desktop.
Save joaocsousa/d93091551338dcf925b4bac78ac5211d to your computer and use it in GitHub Desktop.
// how to use
SafeZip.zip(single1, single2, ::merge).subscribe(...)
class SafeZip {
companion object {
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
fun <T, U, R> zip(single1: Single<T>, single2: Single<U>, zipper: (T, U) -> R): Single<R> {
val errorHandler = SingleErrorHandler()
return Single.zip(
single1.toErrorSafeSingle(errorHandler),
single2.toErrorSafeSingle(errorHandler),
BiFunction<T, U, R> { t, u -> zipper(t, u) })
}
private fun <T> Single<T>.toErrorSafeSingle(errorHandler: SingleErrorHandler) = ErrorSafeSingle(this, errorHandler)
}
}
private class SingleErrorHandler {
var disposed = false
fun <V> setDisposed(error: Throwable): Single<V?> {
disposed = true
return Single.error(error)
}
}
private class ErrorSafeSingle<T>(
private val delegate: Single<T>,
private val errorHandler: SingleErrorHandler
) : Single<T>() {
override fun subscribeActual(observer: SingleObserver<in T>) =
delegate.onErrorResumeNext { errorHandler.setDisposed(it) }
.subscribe(ErrorSafeSingleObserver(observer, errorHandler))
}
private class ErrorSafeSingleObserver<T>(
private val source: SingleObserver<in T>,
private val errorHandler: SingleErrorHandler
) : SingleObserver<T> {
override fun onSuccess(t: T) = source.onSuccess(t)
override fun onSubscribe(d: Disposable) = source.onSubscribe(d)
override fun onError(e: Throwable) =
synchronized(errorHandler.disposed) {
if (!errorHandler.disposed) source.onError(e)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment