Created
October 4, 2019 20:53
-
-
Save dpmedeiros/ce8e7a96d6ca1b66b3bbd31d338c112b to your computer and use it in GitHub Desktop.
Utility functions to transform asynchronous rxjava and callback-invoking methods into blocking calls
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package com.dmedeiros.rx.utils | |
import rx.Completable | |
import rx.Single | |
import java.util.concurrent.CountDownLatch | |
import java.util.concurrent.ExecutionException | |
import java.util.concurrent.Executors | |
import java.util.concurrent.ThreadFactory | |
import java.util.concurrent.TimeUnit.MILLISECONDS | |
import java.util.concurrent.atomic.AtomicInteger | |
import java.util.concurrent.locks.ReentrantLock | |
import kotlin.concurrent.withLock | |
/** | |
* Calls a Single in a blocking fashion. | |
* | |
* Note that if the single emits a null value this will throw an [ExecutionException] | |
* | |
* Unlike BlockingObservable, this is meant to be used in production code. | |
* | |
* @throws ExecutionException when the Single terminates with an exception. This exception wraps | |
* the exception via [ExecutionException.cause] | |
* @throws InterruptedException if the thread is interrupted while the Single is running | |
*/ | |
@Throws(InterruptedException::class, ExecutionException::class) | |
fun <T> Single<T>.runBlocking(): T { | |
val latch = CountDownLatch(1) | |
var outsideResult: T? = null | |
var outsideThrowable: Throwable? = null | |
val subscription = subscribe( | |
{ result -> | |
outsideResult = result | |
latch.countDown() | |
}, | |
{ throwable -> | |
outsideThrowable = throwable | |
latch.countDown() | |
} | |
) | |
try { | |
latch.await() | |
} catch (e: InterruptedException) { | |
subscription.unsubscribe() | |
outsideThrowable = e | |
} | |
outsideThrowable.let { // capture var outsideThrowable | |
if (it is InterruptedException) { | |
Thread.currentThread().interrupt() | |
throw it | |
} | |
return outsideResult ?: throw ExecutionException(if (it != null) it else Throwable("null returned")) | |
} | |
} | |
/** | |
* Calls a Completable in a blocking fashion. | |
* | |
* Unlike BlockingObservable, this is meant to be used in production code. If any errors that | |
* occur, or if the current thread is interrupted, [ExecutionException] is thrown | |
* | |
* @throws ExecutionException when the completable terminates with an exception | |
* @throws InterruptedException if the thread is interrupted while the Completable is running | |
*/ | |
@Throws(InterruptedException::class, ExecutionException::class) | |
fun Completable.runBlocking() { | |
val latch = CountDownLatch(1) | |
var outsideThrowable: Throwable? = null | |
val subscription = subscribe( | |
// onCompleted | |
{ | |
latch.countDown() | |
}, | |
// onError | |
{ throwable -> | |
outsideThrowable = throwable | |
latch.countDown() | |
} | |
) | |
try { | |
latch.await() | |
} catch (e: InterruptedException) { | |
subscription.unsubscribe() | |
outsideThrowable = e | |
} | |
outsideThrowable.let { | |
if (it is InterruptedException) { | |
throw it | |
} | |
if (it != null) { | |
throw ExecutionException(it) | |
} | |
} | |
} | |
/** | |
* Calls, in a blocking fashion, a referred method that accepts a callback which takes one | |
* parameter of type [T]. | |
* | |
* Example usage: | |
* for a function defined as: | |
* object myObject { | |
* fun myAsyncFunction(onResult: (String) -> Unit) { ... } | |
* } | |
* You can block on this function and get the result by doing: | |
* val status: String = myObject::myAsyncFunction.runBlocking() | |
* | |
* @param timeoutMs how long to wait for the callback to be invoked. if this timeout expires | |
* before the callback is invoked, null will be returned | |
* | |
* @return the value reported to the callback. | |
* | |
* @throws InterruptedException if the blocking thread is interrupted | |
*/ | |
@Throws(InterruptedException::class) | |
inline fun <reified T, R> (((T) -> Unit) -> R).runBlocking(timeoutMs: Long = 500): T { | |
val resultContainer: MutableSet<T> = mutableSetOf() | |
val latch = CountDownLatch(1) | |
invoke { result -> | |
resultContainer += result | |
latch.countDown() | |
} | |
latch.await(timeoutMs, MILLISECONDS) | |
return resultContainer.single() | |
} | |
/** | |
* Calls, in a blocking fashion, a referred method that accepts a callback which takes no | |
* parameters. | |
* | |
* Example usage: | |
* for a function defined as: | |
* object myObject { | |
* fun myAsyncFunction(onFinished: () -> Unit) { ... } | |
* } | |
* You can block on this function and get the result by doing: | |
* myObject::myAsyncFunction.runBlocking() | |
* | |
* @param timeoutMs how long to wait for the callback to be invoked. | |
* | |
* @return true if the method finished executing withing [timeoutMs] milliseconds | |
* | |
* @throws InterruptedException if the blocking thread is interrupted | |
*/ | |
@Throws(InterruptedException::class) | |
fun ((() -> Unit) -> Unit).runNoResultBlocking(timeoutMs: Long = 500): Boolean { | |
val latch = CountDownLatch(1) | |
invoke { latch.countDown() } | |
return latch.await(timeoutMs, MILLISECONDS) | |
} | |
/** | |
* runs a block on another thread, uninterruptibly blocking the current thread and returning the result of the block | |
* execution. | |
* | |
* this function is for executing critical tasks that must be completed before the running thread may be interrupted. | |
* since the current thread will be blocked, be aware of any deadlocks that may occur if the new thread tries to grab | |
* a lock the current thread has taken. | |
* | |
* if the current thread is interrupted by the time this function returns, the thread's interrupted flag will be set. | |
* it is the caller's responsibility to throw interrupted exception if if this flag is set. | |
* | |
*/ | |
fun <R> runUninterruptibly(block: () -> R): R { | |
val lock = ReentrantLock() | |
val blockFinishedCondition = lock.newCondition() | |
lock.withLock { | |
val resultContainer: MutableSet<R> = mutableSetOf() | |
uninterruptableThreadPoolExecutor.execute { | |
lock.withLock { | |
val result = block() | |
resultContainer += result | |
blockFinishedCondition.signal() | |
} | |
} | |
blockFinishedCondition.awaitUninterruptibly() | |
return resultContainer.single() | |
} | |
} | |
private val uninterruptableThreadPoolExecutor = Executors.newCachedThreadPool(object : ThreadFactory { | |
val atom = AtomicInteger(0) | |
override fun newThread(runnable: Runnable?): Thread = | |
Thread(runnable, "Uninterruptable-" + atom.incrementAndGet()) | |
}) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment