Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Parallel coroutine extensions
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.async
import kotlin.coroutines.CoroutineContext
suspend fun <A, B> Collection<A>.parallelMap(
context: CoroutineContext = GlobalScope.coroutineContext,
block: suspend (A) -> B
) = map {
GlobalScope.async(context) { block(it) }
}.map { it.await() }
suspend fun <A, B> Collection<A>.parallelForEach(
context: CoroutineContext = GlobalScope.coroutineContext,
block: suspend (A) -> B
) = map {
GlobalScope.async(context) { block(it) }
}.forEach { it.await() }
suspend fun <A, B> Collection<A>.parallelForEach(
context: CoroutineContext = GlobalScope.coroutineContext,
block: suspend (A) -> B,
maxConcurrency: Int
) {
val jobs = ArrayList<Job>()
GlobalScope.async(context) {
forEach {
println("before job ${it}. it has ${jobs.size}")
while (jobs.size >= maxConcurrency) {
// println("yielding at ${System.currentTimeMillis()}")
yield()
}
println("starting job ${it}. it has ${jobs.size}")
val job = GlobalScope.async(context) { block(it) }
job.invokeOnCompletion {
jobs.remove(job)
}
jobs.add(job)
println("added job ${it}. it has ${jobs.size}")
}
println("Joining jobs")
jobs.toMutableList().joinAll()
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.