Skip to content

Instantly share code, notes, and snippets.

@fluidsonic
Last active January 24, 2023 05:51
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fluidsonic/ba32de21c156bbe8424c8d5fc20dcd8e to your computer and use it in GitHub Desktop.
Save fluidsonic/ba32de21c156bbe8424c8d5fc20dcd8e to your computer and use it in GitHub Desktop.
Parallel coroutine execution with dynamic concurrency in Kotlin
// Dependencies:
// implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.3.9")
// implementation("org.jetbrains.kotlinx:atomicfu:0.14.4")
import kotlinx.atomicfu.*
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.selects.*
import java.io.*
import kotlin.coroutines.*
suspend fun main() = coroutineScope {
val executor = ParallelExecutor(coroutineContext)
println("Concurrency: 1")
coroutineScope {
(1 .. 200).forEach { i ->
launch {
executor.execute {
println("Execution $i")
delay(250)
when (i) {
10 -> {
println("Concurrency: 5")
executor.setConcurrentOperationLimit(5)
}
100 -> {
println("Concurrency: 1")
executor.setConcurrentOperationLimit(1)
}
110 -> {
println("Closing executor")
executor.close()
}
}
}
}
delay(1)
}
}
println("Fin.")
}
class ParallelExecutor(
parentContext: CoroutineContext,
) : Closeable {
private val concurrentOperationLimit = atomic(1)
private val coroutineContext = parentContext + Job()
private var isClosed = atomic(false)
private val killQueue = Channel<Unit>(Channel.UNLIMITED)
private val operationQueue = Channel<Operation<*>>(Channel.RENDEZVOUS)
init {
startOrStopProcessors(expectedCount = concurrentOperationLimit.value, actualCount = 0)
}
override fun close() {
if (!isClosed.compareAndSet(expect = false, update = true))
return
val cause = CancellationException("Executor was closed.")
killQueue.close(cause)
operationQueue.close(cause)
coroutineContext.cancel(cause)
}
private fun CoroutineScope.launchProcessor() = launch {
while (true) {
val operation = select<Operation<*>?> {
killQueue.onReceive { null }
operationQueue.onReceive { it }
} ?: break
operation.execute()
}
}
suspend fun <Result> execute(block: suspend () -> Result): Result =
withContext(coroutineContext) {
val operation = Operation(block)
operationQueue.send(operation)
operation.result.await()
}
// TODO This launches all coroutines in advance even if they're never needed. Find a lazy way to do this.
fun setConcurrentOperationLimit(limit: Int) {
require(limit >= 1) { "'limit' must be greater than zero: $limit" }
require(limit < 1_000_000) { "Don't use a very high limit because it will cause a lot of coroutines to be started eagerly: $limit" }
startOrStopProcessors(expectedCount = limit, actualCount = concurrentOperationLimit.getAndSet(limit))
}
private fun startOrStopProcessors(expectedCount: Int, actualCount: Int) {
if (expectedCount == actualCount)
return
if (isClosed.value)
return
var change = expectedCount - actualCount
while (change > 0 && killQueue.poll() != null)
change -= 1
if (change > 0)
with(CoroutineScope(coroutineContext)) {
repeat(change) { launchProcessor() }
}
else
repeat(-change) { killQueue.offer(Unit) }
}
private class Operation<Result>(
private val block: suspend () -> Result,
) {
private val _result = CompletableDeferred<Result>()
val result: Deferred<Result> get() = _result
suspend fun execute() {
try {
_result.complete(block())
}
catch (e: Throwable) {
_result.completeExceptionally(e)
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment