Skip to content

Instantly share code, notes, and snippets.

@elizarov
Last active March 1, 2024 15:38
Show Gist options
  • Save elizarov/f27400a55c1502aacc35b4a3b2f5c9af to your computer and use it in GitHub Desktop.
Save elizarov/f27400a55c1502aacc35b4a3b2f5c9af to your computer and use it in GitHub Desktop.
// UPDATE: Everyone finding this gist via Google!
// Modern kotlinx.coroutines has out-of-the-box support for asyncLazy with the following expression:
// val myLazyValue = async(start = CoroutineStart.LAZY) { ... }
// Use myLazyValue.await() when you need it
// ---------------- public api ----------------
public interface AsyncLazy<out T> {
public suspend fun value(): T
public fun isInitialized(): Boolean
}
public fun <T> asyncLazy(initializer: suspend () -> T): AsyncLazy<T> = AsyncLazyImpl(initializer)
// ---------------- implementation ----------------
private class Fail(val exception: Throwable)
private open class Waiter<T>(val cont: Continuation<T>?, val next: Waiter<T>?)
private val UNINITIALIZED = Waiter<Any>(null, null)
/*
The first waiter is special. The first coroutine to request value starts an initializer coroutine and it may
produce result without suspension in the same thread. We are using CoroutineIntrinsics.suspendCoroutineOrReturn
and we cannot allow the following sequence of events to happen on stack:
1. Invoke AsyncLazyImpl.value()
2. Invoke CoroutineIntrinsics.suspendCoroutineOrReturn to get continuation "cont"
3. Invoke initializer.startCoroutine(...)
4. Invoke AsyncLazyImpl.resume(...)
5. Invoke AsyncLazyImpl.signalValue(...)
6. Invoke AsyncLazyImpl.resumeWithValue(cont, ...)
7. Invoke cont.resume(...) <-- forbidden resume in the same stack frame as suspendCoroutineOrReturn
So, the first waiter uses a separate consensus to prevent this from happening (similarly to SafeContinuation)
*/
private val C_COMPUTING = 0
private val C_SUSPENDED = 1
private val C_SIGNALLED = 2
private class FirstWaiter<T>(cont: Continuation<T>) : Waiter<T>(cont, null) {
@Volatile
var consensus = C_COMPUTING
companion object {
@JvmStatic
val CONSENSUS = AtomicIntegerFieldUpdater.newUpdater(FirstWaiter::class.java, "consensus")
}
}
private class AsyncLazyImpl<T>(val initializer: suspend () -> T) : AsyncLazy<T>, Continuation<T> {
@Volatile
var state: Any? = UNINITIALIZED // Note: UNINITIALIZED is Waiter<Any>
companion object {
@JvmStatic
val VALUE = AtomicReferenceFieldUpdater.newUpdater(AsyncLazyImpl::class.java, Any::class.java, "state")
}
override fun isInitialized(): Boolean = state !is Waiter<*>
@Suppress("UNCHECKED_CAST")
suspend override fun value(): T =
CoroutineIntrinsics.suspendCoroutineOrReturn sc@ { cont ->
while (true) { // lock-free loop on state
val state = this.state // volatile read
if (state !is Waiter<*>) return@sc unwrapValue(state)
if (state == UNINITIALIZED) {
// special case for first waiter -- compute value
val node = FirstWaiter(cont)
if (!VALUE.compareAndSet(this, state, node)) continue
// start computation
initializer.startCoroutine(completion = this)
// try suspend
if (FirstWaiter.CONSENSUS.compareAndSet(node, C_COMPUTING, C_SUSPENDED))
return@sc CoroutineIntrinsics.SUSPENDED
// if failed, then result must have been already produced!
return@sc unwrapValue(this.state) // reread state!
} else {
// other waiters
val node = Waiter(cont, state as Waiter<T>)
if (VALUE.compareAndSet(this, state, node)) return@sc CoroutineIntrinsics.SUSPENDED
}
}
}
@Suppress("UNCHECKED_CAST")
fun resumeWithValue(cont: Continuation<T>, value: Any?) =
if (value is Fail) cont.resumeWithException(value.exception) else cont.resume(value as T)
@Suppress("UNCHECKED_CAST")
fun unwrapValue(value: Any?): T {
require(value !is Waiter<*>)
return if (value is Fail) throw value.exception else value as T
}
@Suppress("UNCHECKED_CAST")
fun signalValue(value: Any?) {
while (true) { // lock-free loop of state
var state = this.state as? Waiter<T> ?: throw IllegalStateException("Value is already set")
if (!VALUE.compareAndSet(this, state, value)) continue
while (state !is FirstWaiter<*>) {
resumeWithValue(state.cont!!, value)
state = state.next!!
}
// process first waiter
signalFirstWaiter(state as FirstWaiter<T>, value)
break
}
}
fun signalFirstWaiter(waiter: FirstWaiter<T>, value: Any?) {
// if the following CAS succeeds, then value() has not returned yet, don't need to resume
if (FirstWaiter.CONSENSUS.compareAndSet(waiter, C_COMPUTING, C_SIGNALLED)) return
// otherwise, value() had suspended -> need to resume first waiter
resumeWithValue(waiter.cont!!, value)
}
override fun resume(value: T) = signalValue(value)
override fun resumeWithException(exception: Throwable) = signalValue(Fail(exception))
}
@elizarov
Copy link
Author

📣 📣 📣 Everyone finding this gist via Google! Modern kotlinx.coroutines has out-of-the-box support for asyncLazy with the following expression: val myLazyValue = async(start = CoroutineStart.LAZY) { ... }. Use myLazyValue.await() when you need it.

@Brack93
Copy link

Brack93 commented Feb 19, 2024

@elizarov Hello, I have a doubt about the out-of-the-box support: let's say that we have a class with a private async lazy property:

private val lazyValue = coroutineScope.async(start = CoroutineStart.LAZY) { /* code with suspension points here */ }

and a public method to get it:

suspend fun computeValue() = lazyValue.await()

Is this approach thread safe because of the way CoroutineStart.LAZY is implemented - the same way that the by lazy {} is for not-coroutine usage - or do we need to wrap the await call within a mutex lock, if we know that the computeValue() will be called concurrently by many coroutines and we really want to be sure the calculation is computed only once? Thanks for clarifying.

I understood it is thread safe because of the Deferred interface: All functions on this interface and on all interfaces derived from it are thread-safe and can be safely invoked from concurrent coroutines without external synchronization.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment