Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save orcchg/2379838621068a1c2e60089269f1758a to your computer and use it in GitHub Desktop.
Save orcchg/2379838621068a1c2e60089269f1758a to your computer and use it in GitHub Desktop.
Critical section for coroutines with cached result
import java.util.concurrent.ConcurrentHashMap
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
import kotlin.coroutines.suspendCoroutine
class KCriticalSection<T> {
private val suspendedJobs = ConcurrentHashMap<Int, Continuation<Boolean>>()
private var id = AtomicInteger(1000)
@Volatile private var isInsideCriticalSection = false
@Volatile private var _cached: T? = null
suspend fun synchronized(block: suspend (resultAlready: T?) -> T): T {
val thisJobId = id.getAndIncrement()
while (isInsideCriticalSection) {
suspendCoroutine { cont -> // <-- suspending call
suspendedJobs[thisJobId] = cont
}
suspendedJobs.remove(thisJobId)
}
isInsideCriticalSection = true
val result = block(_cached) // <-- suspending call
_cached = result
isInsideCriticalSection = false
suspendedJobs.forEach { (id, cont) ->
cont.resume(true)
}
return result
}
}
var accessToken = "" // a value to be fetched concurrently (should be volatile or thread-confined, or guarded)
val kCriticalSection = KCriticalSection<String>()
// This method can be called from any thread or coroutine at any time
suspend fun concurrentMethod() {
kCriticalSection.synchronized { resultAlready: String? ->
if (!resultAlready.isNullOrBlank()) {
accessToken = resultAlready // use the already available result
return@synchronized resultAlready
}
accessToken = "fetch access token from network" // <-- might be a suspending call
accessToken
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment