Skip to content

Instantly share code, notes, and snippets.

@alorma
Created November 20, 2018 14:03
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alorma/6471342a52c9042e39b80787b81b7dea to your computer and use it in GitHub Desktop.
Save alorma/6471342a52c9042e39b80787b81b7dea to your computer and use it in GitHub Desktop.
import io.reactivex.disposables.Disposable
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.withContext
import rx.Subscription
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import io.reactivex.Completable as Rx2Completable
import io.reactivex.Flowable as Rx2Flowable
import io.reactivex.Observable as Rx2Observable
import io.reactivex.Single as Rx2Single
import rx.Completable as Rx1Completable
import rx.Observable as Rx1Observable
import rx.Single as Rx1Single
suspend inline fun <T> withIO(
crossinline block: () -> T
): T = withContext(Dispatchers.IO) { block() }
suspend fun <T> Rx1Observable<T>.await(): T = suspendCancellableCoroutine { continuation ->
val subscription = subscribe({
continuation.resume(it)
}, {
continuation.resumeWithException(it)
})
addOnSubscribe(continuation, subscription)
}
suspend fun <T> Rx2Observable<T>.await(): T = suspendCancellableCoroutine { continuation ->
val disposable = subscribe({
continuation.resume(it)
}, {
continuation.resumeWithException(it)
})
addOnDispose(continuation, disposable)
}
suspend fun <T> Rx2Flowable<T>.await(): T = suspendCancellableCoroutine { continuation ->
val disposable = subscribe({
continuation.resume(it)
}, {
continuation.resumeWithException(it)
})
addOnDispose(continuation, disposable)
}
suspend fun <T> Rx1Single<T>.await(): T = suspendCancellableCoroutine { continuation ->
val subscription = subscribe({
continuation.resume(it)
}, {
continuation.resumeWithException(it)
})
addOnSubscribe(continuation, subscription)
}
suspend fun <T> Rx2Single<T>.await(): T = suspendCancellableCoroutine { continuation ->
val disposable = subscribe({
continuation.resume(it)
}, {
continuation.resumeWithException(it)
})
addOnDispose(continuation, disposable)
}
suspend fun Rx1Completable.await(): Unit = suspendCancellableCoroutine { continuation ->
val subscription = subscribe({
continuation.resume(Unit)
}, {
continuation.resumeWithException(it)
})
addOnSubscribe(continuation, subscription)
}
suspend fun Rx2Completable.await(): Unit = suspendCancellableCoroutine { continuation ->
val disposable = subscribe({
continuation.resume(Unit)
}, {
continuation.resumeWithException(it)
})
addOnDispose(continuation, disposable)
}
private fun <T> addOnSubscribe(continuation: CancellableContinuation<T>, subscription: Subscription) {
continuation.invokeOnCancellation { subscription.unsubscribe() }
}
private fun <T> addOnDispose(continuation: CancellableContinuation<T>, disposable: Disposable) {
continuation.invokeOnCancellation { disposable.dispose() }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment