Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
investigating deadlock in kotlin
package net.FluxDeadLockInvestigation
import kotlinx.coroutines.*
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.reactor.asFlux
import org.junit.jupiter.api.Test
import reactor.core.scheduler.Schedulers
class FluxDeadLockInvestigation(){
@Test
fun deadlockInvstigation() = runBlocking<Unit>{
(0..3000).map { async {
//noDeadLock()
yesDeadLock()
}}.forEach { it.await()}
}
val thPool = newSingleThreadContext("single")
suspend fun yesDeadLock(){
withContext(Dispatchers.IO){
var channel = Channel<Int>(Channel.RENDEZVOUS)
val producer =
launch(newSingleThreadContext("nconte")){
while (isActive && !channel.isClosedForSend && !channel.isClosedForReceive ){
try{
channel.send(1)
}catch (t:Throwable){
}
}
}
withContext(Dispatchers.Default){
// withContext(thPool){ // <<-- with any other context, even a single thread, this long cause deadlocks
channel.asFlux()
.publishOn(Schedulers.elastic(),1)
.doFinally { producer.cancel() }
.limitRate(1)
.take(30)
.blockLast()
}
1
}
}
}
@fvigotti

This comment has been minimized.

Copy link
Owner Author

fvigotti commented Jul 2, 2019

even a more minimal demonstration of the issue

class FluxDeadLockInvestigation(){

    @Test
    fun deadlockInvstigation() = runBlocking<Unit>(Dispatchers.IO){
        (0..3000).map { async {
            yesDeadLock()
        }}.forEach { it.await()}
    }
    val thPool = newSingleThreadContext("single")
    val thPool2 = newFixedThreadPoolContext(8,"single")
    suspend fun yesDeadLock(){
        withContext(Dispatchers.IO){
            var channel = Channel<Int>(Channel.RENDEZVOUS)
            val producer =
                launch(newSingleThreadContext("nconte")){
                    while (isActive  && !channel.isClosedForSend && !channel.isClosedForReceive ){
                        try{
                            channel.send(1)
                            cancel()
                        }catch (t:Throwable){
                        }
                    }
                }

            withContext(Dispatchers.Default){ // <<--- DEADLOCK
                //withContext(Dispatchers.IO){  // <<-- NO DEADLOCK
                //withContext(thPool){  // <<-- NO DEADLOCK
                //withContext(thPool2){  // <<-- NO DEADLOCK
                channel.asFlux()
                    //.subscribeOn(Schedulers.elastic())
                    .publishOn(Schedulers.elastic(),1)
                    //.doFinally {  producer.cancel() }
                    .limitRate(1)
                    //.take(30)
                    .take(1)
                    .blockLast()
            }
            1
        }
    }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.