-
-
Save elizarov/f27400a55c1502aacc35b4a3b2f5c9af to your computer and use it in GitHub Desktop.
// UPDATE: Everyone finding this gist via Google! | |
// Modern kotlinx.coroutines has out-of-the-box support for asyncLazy with the following expression: | |
// val myLazyValue = async(start = CoroutineStart.LAZY) { ... } | |
// Use myLazyValue.await() when you need it | |
// ---------------- public api ---------------- | |
public interface AsyncLazy<out T> { | |
public suspend fun value(): T | |
public fun isInitialized(): Boolean | |
} | |
public fun <T> asyncLazy(initializer: suspend () -> T): AsyncLazy<T> = AsyncLazyImpl(initializer) | |
// ---------------- implementation ---------------- | |
private class Fail(val exception: Throwable) | |
private open class Waiter<T>(val cont: Continuation<T>?, val next: Waiter<T>?) | |
private val UNINITIALIZED = Waiter<Any>(null, null) | |
/* | |
The first waiter is special. The first coroutine to request value starts an initializer coroutine and it may | |
produce result without suspension in the same thread. We are using CoroutineIntrinsics.suspendCoroutineOrReturn | |
and we cannot allow the following sequence of events to happen on stack: | |
1. Invoke AsyncLazyImpl.value() | |
2. Invoke CoroutineIntrinsics.suspendCoroutineOrReturn to get continuation "cont" | |
3. Invoke initializer.startCoroutine(...) | |
4. Invoke AsyncLazyImpl.resume(...) | |
5. Invoke AsyncLazyImpl.signalValue(...) | |
6. Invoke AsyncLazyImpl.resumeWithValue(cont, ...) | |
7. Invoke cont.resume(...) <-- forbidden resume in the same stack frame as suspendCoroutineOrReturn | |
So, the first waiter uses a separate consensus to prevent this from happening (similarly to SafeContinuation) | |
*/ | |
private val C_COMPUTING = 0 | |
private val C_SUSPENDED = 1 | |
private val C_SIGNALLED = 2 | |
private class FirstWaiter<T>(cont: Continuation<T>) : Waiter<T>(cont, null) { | |
@Volatile | |
var consensus = C_COMPUTING | |
companion object { | |
@JvmStatic | |
val CONSENSUS = AtomicIntegerFieldUpdater.newUpdater(FirstWaiter::class.java, "consensus") | |
} | |
} | |
private class AsyncLazyImpl<T>(val initializer: suspend () -> T) : AsyncLazy<T>, Continuation<T> { | |
@Volatile | |
var state: Any? = UNINITIALIZED // Note: UNINITIALIZED is Waiter<Any> | |
companion object { | |
@JvmStatic | |
val VALUE = AtomicReferenceFieldUpdater.newUpdater(AsyncLazyImpl::class.java, Any::class.java, "state") | |
} | |
override fun isInitialized(): Boolean = state !is Waiter<*> | |
@Suppress("UNCHECKED_CAST") | |
suspend override fun value(): T = | |
CoroutineIntrinsics.suspendCoroutineOrReturn sc@ { cont -> | |
while (true) { // lock-free loop on state | |
val state = this.state // volatile read | |
if (state !is Waiter<*>) return@sc unwrapValue(state) | |
if (state == UNINITIALIZED) { | |
// special case for first waiter -- compute value | |
val node = FirstWaiter(cont) | |
if (!VALUE.compareAndSet(this, state, node)) continue | |
// start computation | |
initializer.startCoroutine(completion = this) | |
// try suspend | |
if (FirstWaiter.CONSENSUS.compareAndSet(node, C_COMPUTING, C_SUSPENDED)) | |
return@sc CoroutineIntrinsics.SUSPENDED | |
// if failed, then result must have been already produced! | |
return@sc unwrapValue(this.state) // reread state! | |
} else { | |
// other waiters | |
val node = Waiter(cont, state as Waiter<T>) | |
if (VALUE.compareAndSet(this, state, node)) return@sc CoroutineIntrinsics.SUSPENDED | |
} | |
} | |
} | |
@Suppress("UNCHECKED_CAST") | |
fun resumeWithValue(cont: Continuation<T>, value: Any?) = | |
if (value is Fail) cont.resumeWithException(value.exception) else cont.resume(value as T) | |
@Suppress("UNCHECKED_CAST") | |
fun unwrapValue(value: Any?): T { | |
require(value !is Waiter<*>) | |
return if (value is Fail) throw value.exception else value as T | |
} | |
@Suppress("UNCHECKED_CAST") | |
fun signalValue(value: Any?) { | |
while (true) { // lock-free loop of state | |
var state = this.state as? Waiter<T> ?: throw IllegalStateException("Value is already set") | |
if (!VALUE.compareAndSet(this, state, value)) continue | |
while (state !is FirstWaiter<*>) { | |
resumeWithValue(state.cont!!, value) | |
state = state.next!! | |
} | |
// process first waiter | |
signalFirstWaiter(state as FirstWaiter<T>, value) | |
break | |
} | |
} | |
fun signalFirstWaiter(waiter: FirstWaiter<T>, value: Any?) { | |
// if the following CAS succeeds, then value() has not returned yet, don't need to resume | |
if (FirstWaiter.CONSENSUS.compareAndSet(waiter, C_COMPUTING, C_SIGNALLED)) return | |
// otherwise, value() had suspended -> need to resume first waiter | |
resumeWithValue(waiter.cont!!, value) | |
} | |
override fun resume(value: T) = signalValue(value) | |
override fun resumeWithException(exception: Throwable) = signalValue(Fail(exception)) | |
} |
I've fixed the lazyness of this implementation.
I don't know if we'll be able to get suspend val value
in Kotlin 1.1, but we'll definitely have it in some future update.
This doesnt compile under beta 38. The issue is a few renames, your imports are now:
import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.CoroutineContext
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
import kotlin.coroutines.experimental.startCoroutine
where
CoroutineIntrinsics.suspendCoroutineOrReturn
is now simplysuspendCoroutineOrReturn
(a package level function)CoroutineIntrinsics.SUSPENDED
is nowCOROUTINE_SUSPENDED
(a package level value)
cheer! 🍺
📣 📣 📣 Everyone finding this gist via Google! Modern kotlinx.coroutines
has out-of-the-box support for asyncLazy
with the following expression: val myLazyValue = async(start = CoroutineStart.LAZY) { ... }
. Use myLazyValue.await()
when you need it.
@elizarov Hello, I have a doubt about the out-of-the-box support: let's say that we have a class with a private async lazy property:
private val lazyValue = coroutineScope.async(start = CoroutineStart.LAZY) { /* code with suspension points here */ }
and a public method to get it:
suspend fun computeValue() = lazyValue.await()
Is this approach thread safe because of the way CoroutineStart.LAZY is implemented - the same way that the by lazy {}
is for not-coroutine usage - or do we need to wrap the await call within a mutex lock, if we know that the computeValue()
will be called concurrently by many coroutines and we really want to be sure the calculation is computed only once? Thanks for clarifying.
I understood it is thread safe because of the Deferred interface: All functions on this interface and on all interfaces derived from it are thread-safe and can be safely invoked from concurrent coroutines without external synchronization.
So the idea here is that we keep a compact quasi-linked-list of continuations in the
state
variable, and when this object'ssuspend
is called, the callers implicit continuation is added this list and used to suspend the caller. When the lazy completes, itssignalValue
will resume every continuation in the list with the retrieved value.Its thread safe by virtue of the optimistic forever, read, compare-and-set, break loops.
Now the interesting bit in
AsyncLazyImpl<T>().apply { initializer.startCoroutine(completion = this) }
is thatstartCoroutine
will run the initializer using the AsyncLazyImpl as its completion, callingresume
when the initializer has returned a value. This is done synchronously, asstartCoroutine
is itself a blocking call. The coroutine-ee (ie lazy) behaviour comes in only if theinitializer
contains asuspend
(egawait
) call itself.Perhalps it makes more sense to call
startCoroutine
from thevalue()
method?Also, this begs the obvious question and insane question, no chance of a
suspend val we get() = ...
?