Skip to content

Instantly share code, notes, and snippets.

@elizarov
Last active March 1, 2024 15:38
Show Gist options
  • Save elizarov/f27400a55c1502aacc35b4a3b2f5c9af to your computer and use it in GitHub Desktop.
Save elizarov/f27400a55c1502aacc35b4a3b2f5c9af to your computer and use it in GitHub Desktop.
// UPDATE: Everyone finding this gist via Google!
// Modern kotlinx.coroutines has out-of-the-box support for asyncLazy with the following expression:
// val myLazyValue = async(start = CoroutineStart.LAZY) { ... }
// Use myLazyValue.await() when you need it
// ---------------- public api ----------------
public interface AsyncLazy<out T> {
public suspend fun value(): T
public fun isInitialized(): Boolean
}
public fun <T> asyncLazy(initializer: suspend () -> T): AsyncLazy<T> = AsyncLazyImpl(initializer)
// ---------------- implementation ----------------
private class Fail(val exception: Throwable)
private open class Waiter<T>(val cont: Continuation<T>?, val next: Waiter<T>?)
private val UNINITIALIZED = Waiter<Any>(null, null)
/*
The first waiter is special. The first coroutine to request value starts an initializer coroutine and it may
produce result without suspension in the same thread. We are using CoroutineIntrinsics.suspendCoroutineOrReturn
and we cannot allow the following sequence of events to happen on stack:
1. Invoke AsyncLazyImpl.value()
2. Invoke CoroutineIntrinsics.suspendCoroutineOrReturn to get continuation "cont"
3. Invoke initializer.startCoroutine(...)
4. Invoke AsyncLazyImpl.resume(...)
5. Invoke AsyncLazyImpl.signalValue(...)
6. Invoke AsyncLazyImpl.resumeWithValue(cont, ...)
7. Invoke cont.resume(...) <-- forbidden resume in the same stack frame as suspendCoroutineOrReturn
So, the first waiter uses a separate consensus to prevent this from happening (similarly to SafeContinuation)
*/
private val C_COMPUTING = 0
private val C_SUSPENDED = 1
private val C_SIGNALLED = 2
private class FirstWaiter<T>(cont: Continuation<T>) : Waiter<T>(cont, null) {
@Volatile
var consensus = C_COMPUTING
companion object {
@JvmStatic
val CONSENSUS = AtomicIntegerFieldUpdater.newUpdater(FirstWaiter::class.java, "consensus")
}
}
private class AsyncLazyImpl<T>(val initializer: suspend () -> T) : AsyncLazy<T>, Continuation<T> {
@Volatile
var state: Any? = UNINITIALIZED // Note: UNINITIALIZED is Waiter<Any>
companion object {
@JvmStatic
val VALUE = AtomicReferenceFieldUpdater.newUpdater(AsyncLazyImpl::class.java, Any::class.java, "state")
}
override fun isInitialized(): Boolean = state !is Waiter<*>
@Suppress("UNCHECKED_CAST")
suspend override fun value(): T =
CoroutineIntrinsics.suspendCoroutineOrReturn sc@ { cont ->
while (true) { // lock-free loop on state
val state = this.state // volatile read
if (state !is Waiter<*>) return@sc unwrapValue(state)
if (state == UNINITIALIZED) {
// special case for first waiter -- compute value
val node = FirstWaiter(cont)
if (!VALUE.compareAndSet(this, state, node)) continue
// start computation
initializer.startCoroutine(completion = this)
// try suspend
if (FirstWaiter.CONSENSUS.compareAndSet(node, C_COMPUTING, C_SUSPENDED))
return@sc CoroutineIntrinsics.SUSPENDED
// if failed, then result must have been already produced!
return@sc unwrapValue(this.state) // reread state!
} else {
// other waiters
val node = Waiter(cont, state as Waiter<T>)
if (VALUE.compareAndSet(this, state, node)) return@sc CoroutineIntrinsics.SUSPENDED
}
}
}
@Suppress("UNCHECKED_CAST")
fun resumeWithValue(cont: Continuation<T>, value: Any?) =
if (value is Fail) cont.resumeWithException(value.exception) else cont.resume(value as T)
@Suppress("UNCHECKED_CAST")
fun unwrapValue(value: Any?): T {
require(value !is Waiter<*>)
return if (value is Fail) throw value.exception else value as T
}
@Suppress("UNCHECKED_CAST")
fun signalValue(value: Any?) {
while (true) { // lock-free loop of state
var state = this.state as? Waiter<T> ?: throw IllegalStateException("Value is already set")
if (!VALUE.compareAndSet(this, state, value)) continue
while (state !is FirstWaiter<*>) {
resumeWithValue(state.cont!!, value)
state = state.next!!
}
// process first waiter
signalFirstWaiter(state as FirstWaiter<T>, value)
break
}
}
fun signalFirstWaiter(waiter: FirstWaiter<T>, value: Any?) {
// if the following CAS succeeds, then value() has not returned yet, don't need to resume
if (FirstWaiter.CONSENSUS.compareAndSet(waiter, C_COMPUTING, C_SIGNALLED)) return
// otherwise, value() had suspended -> need to resume first waiter
resumeWithValue(waiter.cont!!, value)
}
override fun resume(value: T) = signalValue(value)
override fun resumeWithException(exception: Throwable) = signalValue(Fail(exception))
}
@Groostav
Copy link

Groostav commented Jan 5, 2017

So the idea here is that we keep a compact quasi-linked-list of continuations in the state variable, and when this object's suspend is called, the callers implicit continuation is added this list and used to suspend the caller. When the lazy completes, its signalValue will resume every continuation in the list with the retrieved value.

Its thread safe by virtue of the optimistic forever, read, compare-and-set, break loops.

Now the interesting bit in AsyncLazyImpl<T>().apply { initializer.startCoroutine(completion = this) } is that startCoroutine will run the initializer using the AsyncLazyImpl as its completion, calling resume when the initializer has returned a value. This is done synchronously, as startCoroutine is itself a blocking call. The coroutine-ee (ie lazy) behaviour comes in only if the initializer contains a suspend (eg await) call itself.

Perhalps it makes more sense to call startCoroutine from the value() method?

Also, this begs the obvious question and insane question, no chance of a suspend val we get() = ...?

@elizarov
Copy link
Author

elizarov commented Jan 5, 2017

I've fixed the lazyness of this implementation.

I don't know if we'll be able to get suspend val value in Kotlin 1.1, but we'll definitely have it in some future update.

@Groostav
Copy link

Groostav commented Feb 13, 2017

This doesnt compile under beta 38. The issue is a few renames, your imports are now:

import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.CoroutineContext
import kotlin.coroutines.experimental.intrinsics.COROUTINE_SUSPENDED
import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
import kotlin.coroutines.experimental.startCoroutine

where

  • CoroutineIntrinsics.suspendCoroutineOrReturn is now simply suspendCoroutineOrReturn (a package level function)
  • CoroutineIntrinsics.SUSPENDED is now COROUTINE_SUSPENDED (a package level value)

cheer! 🍺

@elizarov
Copy link
Author

📣 📣 📣 Everyone finding this gist via Google! Modern kotlinx.coroutines has out-of-the-box support for asyncLazy with the following expression: val myLazyValue = async(start = CoroutineStart.LAZY) { ... }. Use myLazyValue.await() when you need it.

@Brack93
Copy link

Brack93 commented Feb 19, 2024

@elizarov Hello, I have a doubt about the out-of-the-box support: let's say that we have a class with a private async lazy property:

private val lazyValue = coroutineScope.async(start = CoroutineStart.LAZY) { /* code with suspension points here */ }

and a public method to get it:

suspend fun computeValue() = lazyValue.await()

Is this approach thread safe because of the way CoroutineStart.LAZY is implemented - the same way that the by lazy {} is for not-coroutine usage - or do we need to wrap the await call within a mutex lock, if we know that the computeValue() will be called concurrently by many coroutines and we really want to be sure the calculation is computed only once? Thanks for clarifying.

I understood it is thread safe because of the Deferred interface: All functions on this interface and on all interfaces derived from it are thread-safe and can be safely invoked from concurrent coroutines without external synchronization.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment