Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
import io.reactivex.disposables.Disposable
import kotlinx.coroutines.CancellableContinuation
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.suspendCancellableCoroutine
import kotlinx.coroutines.withContext
import rx.Subscription
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import io.reactivex.Completable as Rx2Completable
import io.reactivex.Flowable as Rx2Flowable
import io.reactivex.Observable as Rx2Observable
import io.reactivex.Single as Rx2Single
import rx.Completable as Rx1Completable
import rx.Observable as Rx1Observable
import rx.Single as Rx1Single
suspend inline fun <T> withIO(
crossinline block: () -> T
): T = withContext(Dispatchers.IO) { block() }
suspend fun <T> Rx1Observable<T>.await(): T = suspendCancellableCoroutine { continuation ->
val subscription = subscribe({
continuation.resume(it)
}, {
continuation.resumeWithException(it)
})
addOnSubscribe(continuation, subscription)
}
suspend fun <T> Rx2Observable<T>.await(): T = suspendCancellableCoroutine { continuation ->
val disposable = subscribe({
continuation.resume(it)
}, {
continuation.resumeWithException(it)
})
addOnDispose(continuation, disposable)
}
suspend fun <T> Rx2Flowable<T>.await(): T = suspendCancellableCoroutine { continuation ->
val disposable = subscribe({
continuation.resume(it)
}, {
continuation.resumeWithException(it)
})
addOnDispose(continuation, disposable)
}
suspend fun <T> Rx1Single<T>.await(): T = suspendCancellableCoroutine { continuation ->
val subscription = subscribe({
continuation.resume(it)
}, {
continuation.resumeWithException(it)
})
addOnSubscribe(continuation, subscription)
}
suspend fun <T> Rx2Single<T>.await(): T = suspendCancellableCoroutine { continuation ->
val disposable = subscribe({
continuation.resume(it)
}, {
continuation.resumeWithException(it)
})
addOnDispose(continuation, disposable)
}
suspend fun Rx1Completable.await(): Unit = suspendCancellableCoroutine { continuation ->
val subscription = subscribe({
continuation.resume(Unit)
}, {
continuation.resumeWithException(it)
})
addOnSubscribe(continuation, subscription)
}
suspend fun Rx2Completable.await(): Unit = suspendCancellableCoroutine { continuation ->
val disposable = subscribe({
continuation.resume(Unit)
}, {
continuation.resumeWithException(it)
})
addOnDispose(continuation, disposable)
}
private fun <T> addOnSubscribe(continuation: CancellableContinuation<T>, subscription: Subscription) {
continuation.invokeOnCancellation { subscription.unsubscribe() }
}
private fun <T> addOnDispose(continuation: CancellableContinuation<T>, disposable: Disposable) {
continuation.invokeOnCancellation { disposable.dispose() }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment