Skip to content

Instantly share code, notes, and snippets.

@orcchg
Last active October 21, 2023 17:29
Show Gist options
  • Save orcchg/7e26545482025c2ff460d0036c5fa348 to your computer and use it in GitHub Desktop.
Save orcchg/7e26545482025c2ff460d0036c5fa348 to your computer and use it in GitHub Desktop.
Critical section for coroutines
import java.util.concurrent.ConcurrentHashMap
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
import kotlin.coroutines.suspendCoroutine
class KCriticalSection {
private val suspendedJobs = ConcurrentHashMap<Int, Continuation<Boolean>>()
private var id = AtomicInteger(1000)
@Volatile private var isInsideCriticalSection = false
suspend fun <T> synchronized(block: suspend () -> T): T {
val thisJobId = id.getAndIncrement()
while (isInsideCriticalSection) {
suspendCoroutine { cont -> // <-- suspending call
suspendedJobs[thisJobId] = cont
}
suspendedJobs.remove(thisJobId)
}
isInsideCriticalSection = true
val result = block() // <-- suspending call
isInsideCriticalSection = false
suspendedJobs.forEach { (id, cont) ->
cont.resume(true)
}
return result
}
}
val singleThread = Executors.newSingleThreadExecutor().asCoroutineDispatcher()
var accessToken = "" // a value to be fetched concurrently
// This method can be called from any thread or coroutine at any time
suspend fun concurrentMethod() {
runBlocking(singleThread) { // <-- single thread will execute all coroutines
KCriticalSection().synchronized {
if (accessToken.isNotEmpty()) {
return@synchronized // already done the job
}
accessToken = "fetch access token from network" // <-- might be a suspending call
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment