Skip to content

Instantly share code, notes, and snippets.

@dpmedeiros
Created October 4, 2019 20:53
Show Gist options
  • Save dpmedeiros/ce8e7a96d6ca1b66b3bbd31d338c112b to your computer and use it in GitHub Desktop.
Save dpmedeiros/ce8e7a96d6ca1b66b3bbd31d338c112b to your computer and use it in GitHub Desktop.
Utility functions to transform asynchronous rxjava and callback-invoking methods into blocking calls
package com.dmedeiros.rx.utils
import rx.Completable
import rx.Single
import java.util.concurrent.CountDownLatch
import java.util.concurrent.ExecutionException
import java.util.concurrent.Executors
import java.util.concurrent.ThreadFactory
import java.util.concurrent.TimeUnit.MILLISECONDS
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantLock
import kotlin.concurrent.withLock
/**
* Calls a Single in a blocking fashion.
*
* Note that if the single emits a null value this will throw an [ExecutionException]
*
* Unlike BlockingObservable, this is meant to be used in production code.
*
* @throws ExecutionException when the Single terminates with an exception. This exception wraps
* the exception via [ExecutionException.cause]
* @throws InterruptedException if the thread is interrupted while the Single is running
*/
@Throws(InterruptedException::class, ExecutionException::class)
fun <T> Single<T>.runBlocking(): T {
val latch = CountDownLatch(1)
var outsideResult: T? = null
var outsideThrowable: Throwable? = null
val subscription = subscribe(
{ result ->
outsideResult = result
latch.countDown()
},
{ throwable ->
outsideThrowable = throwable
latch.countDown()
}
)
try {
latch.await()
} catch (e: InterruptedException) {
subscription.unsubscribe()
outsideThrowable = e
}
outsideThrowable.let { // capture var outsideThrowable
if (it is InterruptedException) {
Thread.currentThread().interrupt()
throw it
}
return outsideResult ?: throw ExecutionException(if (it != null) it else Throwable("null returned"))
}
}
/**
* Calls a Completable in a blocking fashion.
*
* Unlike BlockingObservable, this is meant to be used in production code. If any errors that
* occur, or if the current thread is interrupted, [ExecutionException] is thrown
*
* @throws ExecutionException when the completable terminates with an exception
* @throws InterruptedException if the thread is interrupted while the Completable is running
*/
@Throws(InterruptedException::class, ExecutionException::class)
fun Completable.runBlocking() {
val latch = CountDownLatch(1)
var outsideThrowable: Throwable? = null
val subscription = subscribe(
// onCompleted
{
latch.countDown()
},
// onError
{ throwable ->
outsideThrowable = throwable
latch.countDown()
}
)
try {
latch.await()
} catch (e: InterruptedException) {
subscription.unsubscribe()
outsideThrowable = e
}
outsideThrowable.let {
if (it is InterruptedException) {
throw it
}
if (it != null) {
throw ExecutionException(it)
}
}
}
/**
* Calls, in a blocking fashion, a referred method that accepts a callback which takes one
* parameter of type [T].
*
* Example usage:
* for a function defined as:
* object myObject {
* fun myAsyncFunction(onResult: (String) -> Unit) { ... }
* }
* You can block on this function and get the result by doing:
* val status: String = myObject::myAsyncFunction.runBlocking()
*
* @param timeoutMs how long to wait for the callback to be invoked. if this timeout expires
* before the callback is invoked, null will be returned
*
* @return the value reported to the callback.
*
* @throws InterruptedException if the blocking thread is interrupted
*/
@Throws(InterruptedException::class)
inline fun <reified T, R> (((T) -> Unit) -> R).runBlocking(timeoutMs: Long = 500): T {
val resultContainer: MutableSet<T> = mutableSetOf()
val latch = CountDownLatch(1)
invoke { result ->
resultContainer += result
latch.countDown()
}
latch.await(timeoutMs, MILLISECONDS)
return resultContainer.single()
}
/**
* Calls, in a blocking fashion, a referred method that accepts a callback which takes no
* parameters.
*
* Example usage:
* for a function defined as:
* object myObject {
* fun myAsyncFunction(onFinished: () -> Unit) { ... }
* }
* You can block on this function and get the result by doing:
* myObject::myAsyncFunction.runBlocking()
*
* @param timeoutMs how long to wait for the callback to be invoked.
*
* @return true if the method finished executing withing [timeoutMs] milliseconds
*
* @throws InterruptedException if the blocking thread is interrupted
*/
@Throws(InterruptedException::class)
fun ((() -> Unit) -> Unit).runNoResultBlocking(timeoutMs: Long = 500): Boolean {
val latch = CountDownLatch(1)
invoke { latch.countDown() }
return latch.await(timeoutMs, MILLISECONDS)
}
/**
* runs a block on another thread, uninterruptibly blocking the current thread and returning the result of the block
* execution.
*
* this function is for executing critical tasks that must be completed before the running thread may be interrupted.
* since the current thread will be blocked, be aware of any deadlocks that may occur if the new thread tries to grab
* a lock the current thread has taken.
*
* if the current thread is interrupted by the time this function returns, the thread's interrupted flag will be set.
* it is the caller's responsibility to throw interrupted exception if if this flag is set.
*
*/
fun <R> runUninterruptibly(block: () -> R): R {
val lock = ReentrantLock()
val blockFinishedCondition = lock.newCondition()
lock.withLock {
val resultContainer: MutableSet<R> = mutableSetOf()
uninterruptableThreadPoolExecutor.execute {
lock.withLock {
val result = block()
resultContainer += result
blockFinishedCondition.signal()
}
}
blockFinishedCondition.awaitUninterruptibly()
return resultContainer.single()
}
}
private val uninterruptableThreadPoolExecutor = Executors.newCachedThreadPool(object : ThreadFactory {
val atom = AtomicInteger(0)
override fun newThread(runnable: Runnable?): Thread =
Thread(runnable, "Uninterruptable-" + atom.incrementAndGet())
})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment